This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 087302ed6 [Feature] Distinguish between Flink parameters and user 
parameters when starting a Flink job (#1774)
087302ed6 is described below

commit 087302ed675505d5bd021f18cedf0111d466713b
Author: 1996fanrui <[email protected]>
AuthorDate: Sat Oct 8 22:50:55 2022 +0800

    [Feature] Distinguish between Flink parameters and user parameters when 
starting a Flink job (#1774)
---
 .../src/main/resources/flink-application.conf      | 36 ++++++--------
 .../flink/core/FlinkStreamingInitializer.scala     | 54 ++++++++++-----------
 .../flink/core/FlinkTableInitializer.scala         | 55 +++++++++------------
 .../flink/core/FlinkTableInitializer.scala         | 56 +++++++++-------------
 .../flink/core/FlinkTableInitializer.scala         | 56 +++++++++-------------
 .../flink/core/FlinkTableInitializer.scala         | 56 +++++++++-------------
 6 files changed, 131 insertions(+), 182 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
 
b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
index 4f26c689c..bde9400de 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
+++ 
b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
@@ -44,6 +44,21 @@ flink:
         managed.fraction: 0.4
       pipeline:
         auto-watermark-interval: 200ms
+      # checkpoint
+      execution:
+        checkpointing:
+          mode: EXACTLY_ONCE
+          interval: 30s
+          timeout: 10min
+          unaligned: false
+          externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
+      # state backend
+      state:
+        backend: hashmap # Special note: flink1.12 optional configuration 
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration 
('hashmap', 'rocksdb'),
+        backend.incremental: true
+        checkpoint-storage: filesystem
+        savepoints.dir: file:///tmp/chkdir
+        checkpoints.dir: file:///tmp/chkdir
       # restart strategy
       restart-strategy: fixed-delay  # Restart strategy 
[(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
       restart-strategy.fixed-delay:
@@ -53,27 +68,6 @@ flink:
         max-failures-per-interval:
         failure-rate-interval:
         delay:
-  checkpoints:
-    enable: true
-    interval: 30000
-    mode: EXACTLY_ONCE
-    timeout: 300000
-    unaligned: true
-  # state backend
-  state:
-    # Note that the configurations of flink1.12 and later are different, and 
the combined configuration should be selected reasonably
-    backend: # see 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html
-      value: filesystem # Special note: flink1.12 optional configuration 
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration 
('hashmap', 'rocksdb'),
-      memory: 5242880 # Effective for jobmanager, maximum memory
-      async: false    # Valid for (jobmanager, filesystem), whether to enable 
asynchronous
-      incremental: true # Valid for rocksdb, whether to enable incremental
-      # Configuration reference of rocksdb 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend
-      # Remove the prefix of rocksdb configuration key: state.backend
-      #rocksdb.block.blocksize:
-    checkpoint-storage: filesystem # Special note: This parameter is only 
valid in flink 1.12+, and the state.backend.value is hashmap, optional: 
(jobmanager | filesystem)
-    checkpoints.dir: file:///tmp/chkdir
-    savepoints.dir: file:///tmp/chkdir
-    checkpoints.num-retained: 1
   # table
   table:
     planner: blink # (blink|old|any)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 16e2c9d21..c41d0c0be 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -20,9 +20,10 @@ import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util._
-
 import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamEnv}
 import org.apache.flink.table.api.TableConfig
 
 import java.io.File
@@ -44,7 +45,7 @@ private[flink] object FlinkStreamingInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment)
   }
 
   def initialize(args: StreamEnvConfig): (ParameterTool, 
StreamExecutionEnvironment) = {
@@ -57,7 +58,7 @@ private[flink] object FlinkStreamingInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment)
   }
 }
 
@@ -72,29 +73,26 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
 
   var javaTableEnvConfFunc: TableEnvConfigFunction = _
 
-  lazy val parameter: ParameterTool = initParameter()
+  lazy val (userParameter: ParameterTool, flinkConf: Configuration) = 
initParameter()
 
   private[this] var localStreamEnv: StreamExecutionEnvironment = _
 
-  private[this] lazy val defaultFlinkConf: Map[String, String] = {
-    parameter.get(KEY_FLINK_CONF(), null) match {
-      case null =>
-        // start with script
-        val flinkHome = System.getenv("FLINK_HOME")
-        require(flinkHome != null, "FLINK_HOME not found.")
-        logInfo(s"flinkHome: $flinkHome")
-        val yaml = new File(s"$flinkHome/conf/flink-conf.yaml")
-        PropertiesUtils.loadFlinkConfYaml(yaml)
-      case flinkConf =>
-        // passed in from the streampark console backend
-        PropertiesUtils.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
-    }
+  def readUserAndFlinkConf(config: String): (Map[String, String], Map[String, 
String]) = {
+    val allConf = readConf(config)
+    val userConf = allConf
+      .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
+      .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
+
+    val flinkConf = allConf
+      .filter(_._1.startsWith(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX))
+      .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
+    (userConf, flinkConf)
   }
 
-  def readFlinkConf(config: String): Map[String, String] = {
+  def readConf(config: String): Map[String, String] = {
     val extension = config.split("\\.").last.toLowerCase
 
-    val map = config match {
+    config match {
       case x if x.startsWith("yaml://") =>
         PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
       case x if x.startsWith("prop://") =>
@@ -119,13 +117,9 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
           case _ => throw new IllegalArgumentException("[StreamPark] 
Usage:flink.conf file error,must be properties or yml")
         }
     }
-
-    map
-      .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
-      .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
   }
 
-  def initParameter(): ParameterTool = {
+  def initParameter(): (ParameterTool, Configuration) = {
     val argsMap = ParameterTool.fromArgs(args)
     val config = argsMap.get(KEY_APP_CONF(), null) match {
       // scalastyle:off throwerror
@@ -133,9 +127,9 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
       // scalastyle:on throwerror
       case file => file
     }
-    val configArgs = readFlinkConf(config)
+    val (userConf, flinkConf) = readUserAndFlinkConf(config)
     // config priority: explicitly specified priority > project profiles > 
system profiles
-    
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
+    
(ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(userConf)).mergeWith(argsMap),
 Configuration.fromMap(flinkConf))
   }
 
   def streamEnvironment: StreamExecutionEnvironment = {
@@ -150,14 +144,14 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
   }
 
   def initEnvironment(): Unit = {
-    localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
+    localStreamEnv = new 
StreamExecutionEnvironment(JavaStreamEnv.getExecutionEnvironment(flinkConf))
 
     apiType match {
-      case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
-      case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, parameter)
+      case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, userParameter)
+      case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, userParameter)
       case _ =>
     }
-    localStreamEnv.getConfig.setGlobalJobParameters(parameter)
+    localStreamEnv.getConfig.setGlobalJobParameters(userParameter)
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index f0d097004..7838710ad 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.configuration.{Configuration, PipelineOptions}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}
@@ -48,7 +48,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -61,7 +61,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: Array[String],
@@ -78,7 +78,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
   }
 
   def initialize(args: StreamTableEnvConfig):
@@ -93,7 +93,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
   }
 
 }
@@ -131,38 +131,29 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
   /**
    * In case of table SQL, the parameter conf is not required, it depends on 
the developer.
    */
-  override def initParameter(): ParameterTool = {
-    val argsMap = ParameterTool.fromArgs(args)
-    val parameter = argsMap.get(KEY_APP_CONF(), null) match {
-      case null | "" =>
-        logWarn("Usage:can't fond config,you can set \"--conf $path \" in main 
arguments")
-        ParameterTool.fromSystemProperties().mergeWith(argsMap)
-      case file =>
-        val configArgs = super.readFlinkConf(file)
-        // config priority: explicitly specified priority > project profiles > 
system profiles
-        
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
-    }
-    parameter.get(KEY_FLINK_SQL()) match {
-      case null => parameter
+  override def initParameter(): (ParameterTool, Configuration) = {
+    val (userParameter: ParameterTool, flinkConf: Configuration) = 
super.initParameter()
+    (userParameter.get(KEY_FLINK_SQL()) match {
+      case null => userParameter
       case param =>
         // for streampark-console
         Try(DeflaterUtils.unzipString(param)) match {
-          case Success(value) => 
parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+          case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
           case Failure(_) =>
             val sqlFile = new File(param)
             Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
-              case Success(value) => 
parameter.mergeWith(ParameterTool.fromMap(value))
+              case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(value))
               case Failure(e) =>
                 new IllegalArgumentException(s"[StreamPark] init sql error.$e")
-                parameter
+                userParameter
             }
         }
-    }
+    }, flinkConf)
   }
 
   def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
-    val plannerType = 
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
+    val plannerType = 
Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse 
{
       logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by: 
blinkPlanner")
       PlannerType.blink
     }
@@ -179,7 +170,7 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         builder.useAnyPlanner()
     }
 
-    val mode = 
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
+    val mode = 
Try(TableMode.withName(userParameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
     mode match {
       case TableMode.batch =>
         logInfo(s"components should work in $tableMode mode")
@@ -189,7 +180,7 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         builder.inStreamingMode()
     }
 
-    val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
+    val buildWith = (userParameter.get(KEY_FLINK_TABLE_CATALOG), 
userParameter.get(KEY_FLINK_TABLE_DATABASE))
     buildWith match {
       case (x: String, y: String) if x != null && y != null =>
         logInfo(s"with built in catalog: $x")
@@ -210,14 +201,14 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
       case TableMode.streaming =>
         initEnvironment()
         if (streamEnvConfFunc != null) {
-          streamEnvConfFunc(streamEnvironment, parameter)
+          streamEnvConfFunc(streamEnvironment, userParameter)
         }
         if (javaStreamEnvConfFunc != null) {
-          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
parameter)
+          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
userParameter)
         }
         localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, 
setting)
     }
-    val appName = (parameter.get(KEY_APP_NAME(), null), 
parameter.get(KEY_FLINK_APP_NAME, null)) match {
+    val appName = (userParameter.get(KEY_APP_NAME(), null), 
userParameter.get(KEY_FLINK_APP_NAME, null)) match {
       case (appName: String, _) => appName
       case (null, appName: String) => appName
       case _ => null
@@ -233,15 +224,15 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
       case ApiType.java =>
         if (javaTableEnvConfFunc != null) {
           tableMode match {
-            case TableMode.batch => 
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
-            case TableMode.streaming => 
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
+            case TableMode.batch => 
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, userParameter)
+            case TableMode.streaming => 
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, userParameter)
           }
         }
       case ApiType.scala =>
         if (tableConfFunc != null) {
           tableMode match {
-            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
parameter)
-            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, parameter)
+            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
userParameter)
+            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, userParameter)
           }
         }
     }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 3f5b01dc9..9bcc1bc69 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.configuration.{Configuration, PipelineOptions}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}
@@ -48,7 +48,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -61,7 +61,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: Array[String],
@@ -78,7 +78,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
   }
 
   def initialize(args: StreamTableEnvConfig):
@@ -93,7 +93,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
   }
 
 }
@@ -130,39 +130,29 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
   /**
    * In case of table SQL, the parameter conf is not required, it depends on 
the developer.
    */
-  override def initParameter(): ParameterTool = {
-    val argsMap = ParameterTool.fromArgs(args)
-    val parameter = argsMap.get(KEY_APP_CONF(), null) match {
-      case null | "" =>
-        logWarn("Usage:can't fond config,you can set \"--conf $path \" in main 
arguments")
-        ParameterTool.fromSystemProperties().mergeWith(argsMap)
-      case file =>
-        val configArgs = super.readFlinkConf(file)
-        // config priority: explicitly specified priority > project profiles > 
system profiles
-        
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
-    }
-    parameter.get(KEY_FLINK_SQL()) match {
-      case null => parameter
+  override def initParameter(): (ParameterTool, Configuration) = {
+    val (userParameter: ParameterTool, flinkConf: Configuration) = 
super.initParameter()
+    (userParameter.get(KEY_FLINK_SQL()) match {
+      case null => userParameter
       case param =>
         // for streampark-console
         Try(DeflaterUtils.unzipString(param)) match {
-          case Success(value) => 
parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+          case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
           case Failure(_) =>
             val sqlFile = new File(param)
             Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
-              case Success(value) => 
parameter.mergeWith(ParameterTool.fromMap(value))
+              case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(value))
               case Failure(e) =>
                 new IllegalArgumentException(s"[StreamPark] init sql error.$e")
-                parameter
+                userParameter
             }
         }
-    }
-
+    }, flinkConf)
   }
 
   def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
-    val plannerType = 
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
+    val plannerType = 
Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse 
{
       logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by: 
blinkPlanner")
       PlannerType.blink
     }
@@ -179,7 +169,7 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         builder.useAnyPlanner()
     }
 
-    val mode = 
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
+    val mode = 
Try(TableMode.withName(userParameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
     mode match {
       case TableMode.batch =>
         logInfo(s"components should work in $tableMode mode")
@@ -189,7 +179,7 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         builder.inStreamingMode()
     }
 
-    val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
+    val buildWith = (userParameter.get(KEY_FLINK_TABLE_CATALOG), 
userParameter.get(KEY_FLINK_TABLE_DATABASE))
     buildWith match {
       case (x: String, y: String) if x != null && y != null =>
         logInfo(s"with built in catalog: $x")
@@ -210,14 +200,14 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
       case TableMode.streaming =>
         initEnvironment()
         if (streamEnvConfFunc != null) {
-          streamEnvConfFunc(streamEnvironment, parameter)
+          streamEnvConfFunc(streamEnvironment, userParameter)
         }
         if (javaStreamEnvConfFunc != null) {
-          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
parameter)
+          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
userParameter)
         }
         localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, 
setting)
     }
-    val appName = (parameter.get(KEY_APP_NAME(), null), 
parameter.get(KEY_FLINK_APP_NAME, null)) match {
+    val appName = (userParameter.get(KEY_APP_NAME(), null), 
userParameter.get(KEY_FLINK_APP_NAME, null)) match {
       case (appName: String, _) => appName
       case (null, appName: String) => appName
       case _ => null
@@ -233,15 +223,15 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
       case ApiType.java =>
         if (javaTableEnvConfFunc != null) {
           tableMode match {
-            case TableMode.batch => 
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
-            case TableMode.streaming => 
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
+            case TableMode.batch => 
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, userParameter)
+            case TableMode.streaming => 
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, userParameter)
           }
         }
       case ApiType.scala =>
         if (tableConfFunc != null) {
           tableMode match {
-            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
parameter)
-            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, parameter)
+            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
userParameter)
+            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, userParameter)
           }
         }
     }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 5e132d1fa..635a491e4 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.enums.{ApiType, TableMode}
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.configuration.{Configuration, PipelineOptions}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}
@@ -48,7 +48,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -61,7 +61,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: Array[String],
@@ -78,7 +78,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
   }
 
   def initialize(args: StreamTableEnvConfig):
@@ -93,7 +93,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
   }
 
 }
@@ -130,39 +130,29 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
   /**
    * In case of table SQL, the parameter conf is not required, it depends on 
the developer.
    */
-  override def initParameter(): ParameterTool = {
-    val argsMap = ParameterTool.fromArgs(args)
-    val parameter = argsMap.get(KEY_APP_CONF(), null) match {
-      case null | "" =>
-        logWarn("Usage:can't fond config,you can set \"--conf $path \" in main 
arguments")
-        ParameterTool.fromSystemProperties().mergeWith(argsMap)
-      case file =>
-        val configArgs = super.readFlinkConf(file)
-        // config priority: explicitly specified priority > project profiles > 
system profiles
-        
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
-    }
-    parameter.get(KEY_FLINK_SQL()) match {
-      case null => parameter
+  override def initParameter(): (ParameterTool, Configuration) = {
+    val (userParameter: ParameterTool, flinkConf: Configuration) = 
super.initParameter()
+    (userParameter.get(KEY_FLINK_SQL()) match {
+      case null => userParameter
       case param =>
-        //for streampark-console
+        // for streampark-console
         Try(DeflaterUtils.unzipString(param)) match {
-          case Success(value) => 
parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+          case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
           case Failure(_) =>
             val sqlFile = new File(param)
             Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
-              case Success(value) => 
parameter.mergeWith(ParameterTool.fromMap(value))
+              case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(value))
               case Failure(e) =>
                 new IllegalArgumentException(s"[StreamPark] init sql error.$e")
-                parameter
+                userParameter
             }
         }
-    }
-
+    }, flinkConf)
   }
 
   def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
-    val mode = 
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
+    val mode = 
Try(TableMode.withName(userParameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
     mode match {
       case TableMode.batch =>
         logInfo(s"components should work in $tableMode mode")
@@ -172,7 +162,7 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         builder.inStreamingMode()
     }
 
-    val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
+    val buildWith = (userParameter.get(KEY_FLINK_TABLE_CATALOG), 
userParameter.get(KEY_FLINK_TABLE_DATABASE))
     buildWith match {
       case (x: String, y: String) if x != null && y != null =>
         logInfo(s"with built in catalog: $x")
@@ -193,14 +183,14 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
       case TableMode.streaming =>
         initEnvironment()
         if (streamEnvConfFunc != null) {
-          streamEnvConfFunc(streamEnvironment, parameter)
+          streamEnvConfFunc(streamEnvironment, userParameter)
         }
         if (javaStreamEnvConfFunc != null) {
-          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
parameter)
+          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
userParameter)
         }
         localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, 
setting)
     }
-    val appName = (parameter.get(KEY_APP_NAME(), null), 
parameter.get(KEY_FLINK_APP_NAME, null)) match {
+    val appName = (userParameter.get(KEY_APP_NAME(), null), 
userParameter.get(KEY_FLINK_APP_NAME, null)) match {
       case (appName: String, _) => appName
       case (null, appName: String) => appName
       case _ => null
@@ -216,15 +206,15 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
       case ApiType.java =>
         if (javaTableEnvConfFunc != null) {
           tableMode match {
-            case TableMode.batch => 
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
-            case TableMode.streaming => 
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
+            case TableMode.batch => 
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, userParameter)
+            case TableMode.streaming => 
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, userParameter)
           }
         }
       case ApiType.scala =>
         if (tableConfFunc != null) {
           tableMode match {
-            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
parameter)
-            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, parameter)
+            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
userParameter)
+            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, userParameter)
           }
         }
     }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 6a8207f5d..e37f4d21d 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,13 +17,13 @@
 
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, 
KEY_APP_NAME, KEY_FLINK_APP_NAME, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG, 
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, 
KEY_FLINK_APP_NAME, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG, 
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
 import org.apache.streampark.common.enums.{ApiType, TableMode}
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.configuration.{Configuration, PipelineOptions}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}
@@ -49,7 +49,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -62,7 +62,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: Array[String],
@@ -79,7 +79,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
   }
 
   def initialize(args: StreamTableEnvConfig):
@@ -94,7 +94,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.userParameter, flinkInitializer.streamEnvironment, 
flinkInitializer.streamTableEnvironment)
   }
 }
 
@@ -130,39 +130,29 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
   /**
    * In case of table SQL, the parameter conf is not required, it depends on 
the developer.
    */
-  override def initParameter(): ParameterTool = {
-    val argsMap = ParameterTool.fromArgs(args)
-    val parameter = argsMap.get(KEY_APP_CONF(), null) match {
-      case null | "" =>
-        logWarn("Usage:can't fond config,you can set \"--conf $path \" in main 
arguments")
-        ParameterTool.fromSystemProperties().mergeWith(argsMap)
-      case file =>
-        val configArgs = super.readFlinkConf(file)
-        // config priority: explicitly specified priority > project profiles > 
system profiles
-        
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
-    }
-    parameter.get(KEY_FLINK_SQL()) match {
-      case null => parameter
+  override def initParameter(): (ParameterTool, Configuration) = {
+    val (userParameter: ParameterTool, flinkConf: Configuration) = 
super.initParameter()
+    (userParameter.get(KEY_FLINK_SQL()) match {
+      case null => userParameter
       case param =>
         // for streampark-console
         Try(DeflaterUtils.unzipString(param)) match {
-          case Success(value) => 
parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+          case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
           case Failure(_) =>
             val sqlFile = new File(param)
             Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
-              case Success(value) => 
parameter.mergeWith(ParameterTool.fromMap(value))
+              case Success(value) => 
userParameter.mergeWith(ParameterTool.fromMap(value))
               case Failure(e) =>
                 new IllegalArgumentException(s"[StreamPark] init sql error.$e")
-                parameter
+                userParameter
             }
         }
-    }
-
+    }, flinkConf)
   }
 
   def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
-    val mode = 
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
+    val mode = 
Try(TableMode.withName(userParameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
     mode match {
       case TableMode.batch =>
         logInfo(s"components should work in $tableMode mode")
@@ -172,7 +162,7 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         builder.inStreamingMode()
     }
 
-    val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
+    val buildWith = (userParameter.get(KEY_FLINK_TABLE_CATALOG), 
userParameter.get(KEY_FLINK_TABLE_DATABASE))
     buildWith match {
       case (x: String, y: String) if x != null && y != null =>
         logInfo(s"with built in catalog: $x")
@@ -193,14 +183,14 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
       case TableMode.streaming =>
         initEnvironment()
         if (streamEnvConfFunc != null) {
-          streamEnvConfFunc(streamEnvironment, parameter)
+          streamEnvConfFunc(streamEnvironment, userParameter)
         }
         if (javaStreamEnvConfFunc != null) {
-          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
parameter)
+          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
userParameter)
         }
         localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, 
setting)
     }
-    val appName = (parameter.get(KEY_APP_NAME(), null), 
parameter.get(KEY_FLINK_APP_NAME, null)) match {
+    val appName = (userParameter.get(KEY_APP_NAME(), null), 
userParameter.get(KEY_FLINK_APP_NAME, null)) match {
       case (appName: String, _) => appName
       case (null, appName: String) => appName
       case _ => null
@@ -216,15 +206,15 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
       case ApiType.java =>
         if (javaTableEnvConfFunc != null) {
           tableMode match {
-            case TableMode.batch => 
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
-            case TableMode.streaming => 
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
+            case TableMode.batch => 
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, userParameter)
+            case TableMode.streaming => 
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, userParameter)
           }
         }
       case ApiType.scala =>
         if (tableConfFunc != null) {
           tableMode match {
-            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
parameter)
-            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, parameter)
+            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
userParameter)
+            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, userParameter)
           }
         }
     }


Reply via email to