This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new bbe77310f [Improve] load flink-config minor improvements
bbe77310f is described below

commit bbe77310f3801ab7ed7ece7f82454598e09df276
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 7 11:43:39 2024 +0800

    [Improve] load flink-config minor improvements
---
 .../streampark/flink/client/trait/FlinkClientTrait.scala     |  2 +-
 .../streampark/flink/core/FlinkStreamingInitializer.scala    | 12 +-----------
 .../apache/streampark/flink/core/FlinkTableInitializer.scala | 12 +-----------
 3 files changed, 3 insertions(+), 23 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 896767ca3..0437dbaab 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -77,7 +77,7 @@ trait FlinkClientTrait extends Logger {
          |""".stripMargin)
 
     val flinkConfig = prepareConfig(submitRequest)
-
+    flinkConfig.toMap.foreach(c => logInfo(s"flinkConfig:  ${c._1}: ${c._2}"))
     setConfig(submitRequest, flinkConfig)
 
     Try(doSubmit(submitRequest, flinkConfig)) match {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index ab1808f1c..27c72cd19 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -95,17 +95,7 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
       .mergeWith(ParameterTool.fromMap(appConf))
       .mergeWith(argsMap)
 
-    val flinkConf: Map[String, String] = {
-      parameter.get(KEY_FLINK_CONF(), null) match {
-        case flinkConf if flinkConf != null =>
-          PropertiesUtils
-            .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
-            .filter(_._2.nonEmpty)
-        case _ => Map.empty
-      }
-    }
-
-    val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
+    val envConfig = Configuration.fromMap(appFlinkConf)
     FlinkConfiguration(parameter, envConfig, null)
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 89f3044d3..1727a72a0 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -187,17 +187,7 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         val tableConf = extractConfigByPrefix(configMap, 
KEY_FLINK_TABLE_PREFIX)
         val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX)
 
-        val flinkConf: Map[String, String] = {
-          parameter.get(KEY_FLINK_CONF(), null) match {
-            case flinkConf if flinkConf != null =>
-              PropertiesUtils
-                .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
-                .filter(_._2.nonEmpty)
-            case _ => Map.empty
-          }
-        }
-
-        val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
+        val envConfig = Configuration.fromMap(appFlinkConf)
         val tableConfig = Configuration.fromMap(tableConf)
 
         val parameterTool = ParameterTool

Reply via email to