1996fanrui commented on code in PR #1820:
URL: 
https://github.com/apache/incubator-streampark/pull/1820#discussion_r993131873


##########
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala:
##########
@@ -127,50 +127,62 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
     localTableEnv
   }
 
-
   /**
    * In case of table SQL, the parameter conf is not required, it depends on 
the developer.
    */
+
   override def initParameter(): (ParameterTool, Configuration) = {
-    val (userParameter: ParameterTool, flinkConf: Configuration) = 
super.initParameter()
-    (userParameter.get(KEY_FLINK_SQL()) match {
-      case null => userParameter
+    val (appParameter: ParameterTool, flinkConf: Configuration) = {
+      val argsMap = ParameterTool.fromArgs(args)
+      argsMap.get(KEY_APP_CONF(), null) match {
+        case null | "" =>
+          logWarn("Usage:can't fond config,you can set \"--conf $path \" in 
main arguments")
+          ParameterTool.fromSystemProperties().mergeWith(argsMap) -> new 
Configuration()
+        case file => super.parseConfig(file)
+      }
+    }
+
+    val appParam = appParameter.get(KEY_FLINK_SQL()) match {
+      case null => appParameter
       case param =>
         // for streampark-console
         Try(DeflaterUtils.unzipString(param)) match {
-          case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+          case Success(value) => 
appParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
           case Failure(_) =>
             val sqlFile = new File(param)
             Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
-              case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(value))
+              case Success(value) => 
appParameter.mergeWith(ParameterTool.fromMap(value))
               case Failure(e) =>
                 new IllegalArgumentException(s"[StreamPark] init sql error.$e")
-                userParameter
+                appParameter
             }
         }
-    }, flinkConf)
+    }
+
+    appParam -> flinkConf
   }
 
   def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
-    val plannerType = 
Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse 
{
-      logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by: 
blinkPlanner")
-      PlannerType.blink
-    }
-
-    plannerType match {
-      case PlannerType.blink =>
-        logInfo("blinkPlanner will be use.")
-        builder.useBlinkPlanner()
-      case PlannerType.old =>
-        logInfo("oldPlanner will be use.")
-        builder.useOldPlanner()
-      case PlannerType.any =>
-        logInfo("anyPlanner will be use.")
-        builder.useAnyPlanner()
+    val plannerType = 
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse(PlannerType.blink)
+
+    try {
+      plannerType match {
+        case PlannerType.blink =>
+          logInfo("blinkPlanner will be use.")
+          builder.useBlinkPlanner()
+        case PlannerType.old =>
+          logInfo("oldPlanner will be use.")
+          builder.useOldPlanner()
+        case PlannerType.any =>
+          logInfo("anyPlanner will be use.")
+          builder.useAnyPlanner()
+      }
+    } catch {
+      case e: IncompatibleClassChangeError =>

Review Comment:
   After Flink 1.14, blink planner as the default planner. So Flink 1.12 and 
1.13 has the `useBlinkPlanner ` method, but Flink 1.14 and 1.15 don't have the 
method.
   
   Currently, this PR use the `try catch IncompatibleClassChangeError ` to be 
compatible with this problem. From the code specification, it is not 
reasonable. In the long term, it is very difficult to maintain.
   
   
   We need a general code specification: How to adapt methods that are 
incompatible with each Flink version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to