This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new dcef16468 [Bug] load flink-conf bug fixed. (#4109)
dcef16468 is described below

commit dcef16468fc6720a82c7a15e04ebd1bdd93435f7
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 7 10:40:02 2024 +0800

    [Bug] load flink-conf bug fixed. (#4109)
    
    * [Improve] read flinkConf bug fixed.
    
    * [Improve] scala map bug fixed.
---
 .../flink/core/FlinkStreamingInitializer.scala         | 16 +++++++++++++---
 .../streampark/flink/core/FlinkTableInitializer.scala  | 18 ++++++++++++++----
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 47a5bce6b..ab1808f1c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -86,16 +86,26 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
       throw new IllegalArgumentException(
         "[StreamPark] Usage:can't fond config, please set \"--conf $path \" in 
main arguments")
     }
-    val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
+    val appFlinkConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
     val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
     // config priority: explicitly specified priority > project profiles > 
system profiles
     val parameter = ParameterTool
       .fromSystemProperties()
-      .mergeWith(ParameterTool.fromMap(flinkConf))
+      .mergeWith(ParameterTool.fromMap(appFlinkConf))
       .mergeWith(ParameterTool.fromMap(appConf))
       .mergeWith(argsMap)
 
-    val envConfig = Configuration.fromMap(flinkConf)
+    val flinkConf: Map[String, String] = {
+      parameter.get(KEY_FLINK_CONF(), null) match {
+        case flinkConf if flinkConf != null =>
+          PropertiesUtils
+            .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
+            .filter(_._2.nonEmpty)
+        case _ => Map.empty
+      }
+    }
+
+    val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
     FlinkConfiguration(parameter, envConfig, null)
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index fc2d073fc..89f3044d3 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.core
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.enums.{ApiType, PlannerType}
 import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.util.DeflaterUtils
+import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.streampark.flink.core.EnhancerImplicit._
 
 import org.apache.flink.api.java.utils.ParameterTool
@@ -182,17 +182,27 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         FlinkConfiguration(cliParameterTool, new Configuration(), new 
Configuration())
       } else {
         // config priority: explicitly specified priority > project profiles > 
system profiles
-        val flinkConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
+        val appFlinkConf = extractConfigByPrefix(configMap, 
KEY_FLINK_PROPERTY_PREFIX)
         val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
         val tableConf = extractConfigByPrefix(configMap, 
KEY_FLINK_TABLE_PREFIX)
         val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX)
 
-        val envConfig = Configuration.fromMap(flinkConf)
+        val flinkConf: Map[String, String] = {
+          parameter.get(KEY_FLINK_CONF(), null) match {
+            case flinkConf if flinkConf != null =>
+              PropertiesUtils
+                .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
+                .filter(_._2.nonEmpty)
+            case _ => Map.empty
+          }
+        }
+
+        val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
         val tableConfig = Configuration.fromMap(tableConf)
 
         val parameterTool = ParameterTool
           .fromSystemProperties()
-          .mergeWith(ParameterTool.fromMap(flinkConf))
+          .mergeWith(ParameterTool.fromMap(appFlinkConf))
           .mergeWith(ParameterTool.fromMap(appConf))
           .mergeWith(ParameterTool.fromMap(tableConf))
           .mergeWith(ParameterTool.fromMap(sqlConf))

Reply via email to