This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new dcef16468 [Bug] load flink-conf bug fixed. (#4109)
dcef16468 is described below
commit dcef16468fc6720a82c7a15e04ebd1bdd93435f7
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 7 10:40:02 2024 +0800
[Bug] load flink-conf bug fixed. (#4109)
* [Improve] read flinkConf bug fixed.
* [Improve] scala map bug fixed.
---
.../flink/core/FlinkStreamingInitializer.scala | 16 +++++++++++++---
.../streampark/flink/core/FlinkTableInitializer.scala | 18 ++++++++++++++----
2 files changed, 27 insertions(+), 7 deletions(-)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 47a5bce6b..ab1808f1c 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -86,16 +86,26 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
throw new IllegalArgumentException(
"[StreamPark] Usage:can't fond config, please set \"--conf $path \" in
main arguments")
}
- val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
+ val appFlinkConf = extractConfigByPrefix(configMap,
KEY_FLINK_PROPERTY_PREFIX)
val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
// config priority: explicitly specified priority > project profiles >
system profiles
val parameter = ParameterTool
.fromSystemProperties()
- .mergeWith(ParameterTool.fromMap(flinkConf))
+ .mergeWith(ParameterTool.fromMap(appFlinkConf))
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(argsMap)
- val envConfig = Configuration.fromMap(flinkConf)
+ val flinkConf: Map[String, String] = {
+ parameter.get(KEY_FLINK_CONF(), null) match {
+ case flinkConf if flinkConf != null =>
+ PropertiesUtils
+ .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
+ .filter(_._2.nonEmpty)
+ case _ => Map.empty
+ }
+ }
+
+ val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
FlinkConfiguration(parameter, envConfig, null)
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index fc2d073fc..89f3044d3 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.{ApiType, PlannerType}
import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.util.DeflaterUtils
+import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.streampark.flink.core.EnhancerImplicit._
import org.apache.flink.api.java.utils.ParameterTool
@@ -182,17 +182,27 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
FlinkConfiguration(cliParameterTool, new Configuration(), new
Configuration())
} else {
// config priority: explicitly specified priority > project profiles >
system profiles
- val flinkConf = extractConfigByPrefix(configMap,
KEY_FLINK_PROPERTY_PREFIX)
+ val appFlinkConf = extractConfigByPrefix(configMap,
KEY_FLINK_PROPERTY_PREFIX)
val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
val tableConf = extractConfigByPrefix(configMap,
KEY_FLINK_TABLE_PREFIX)
val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX)
- val envConfig = Configuration.fromMap(flinkConf)
+ val flinkConf: Map[String, String] = {
+ parameter.get(KEY_FLINK_CONF(), null) match {
+ case flinkConf if flinkConf != null =>
+ PropertiesUtils
+ .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
+ .filter(_._2.nonEmpty)
+ case _ => Map.empty
+ }
+ }
+
+ val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
val tableConfig = Configuration.fromMap(tableConf)
val parameterTool = ParameterTool
.fromSystemProperties()
- .mergeWith(ParameterTool.fromMap(flinkConf))
+ .mergeWith(ParameterTool.fromMap(appFlinkConf))
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(ParameterTool.fromMap(tableConf))
.mergeWith(ParameterTool.fromMap(sqlConf))