This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new bbe77310f [Improve] load flink-config minor improvements
bbe77310f is described below
commit bbe77310f3801ab7ed7ece7f82454598e09df276
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 7 11:43:39 2024 +0800
[Improve] load flink-config minor improvements
---
.../streampark/flink/client/trait/FlinkClientTrait.scala | 2 +-
.../streampark/flink/core/FlinkStreamingInitializer.scala | 12 +-----------
.../apache/streampark/flink/core/FlinkTableInitializer.scala | 12 +-----------
3 files changed, 3 insertions(+), 23 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 896767ca3..0437dbaab 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -77,7 +77,7 @@ trait FlinkClientTrait extends Logger {
|""".stripMargin)
val flinkConfig = prepareConfig(submitRequest)
-
+ flinkConfig.toMap.foreach(c => logInfo(s"flinkConfig: ${c._1}: ${c._2}"))
setConfig(submitRequest, flinkConfig)
Try(doSubmit(submitRequest, flinkConfig)) match {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index ab1808f1c..27c72cd19 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -95,17 +95,7 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(argsMap)
- val flinkConf: Map[String, String] = {
- parameter.get(KEY_FLINK_CONF(), null) match {
- case flinkConf if flinkConf != null =>
- PropertiesUtils
- .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
- .filter(_._2.nonEmpty)
- case _ => Map.empty
- }
- }
-
- val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
+ val envConfig = Configuration.fromMap(appFlinkConf)
FlinkConfiguration(parameter, envConfig, null)
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 89f3044d3..1727a72a0 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -187,17 +187,7 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
val tableConf = extractConfigByPrefix(configMap,
KEY_FLINK_TABLE_PREFIX)
val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX)
- val flinkConf: Map[String, String] = {
- parameter.get(KEY_FLINK_CONF(), null) match {
- case flinkConf if flinkConf != null =>
- PropertiesUtils
- .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
- .filter(_._2.nonEmpty)
- case _ => Map.empty
- }
- }
-
- val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
+ val envConfig = Configuration.fromMap(appFlinkConf)
val tableConfig = Configuration.fromMap(tableConf)
val parameterTool = ParameterTool