This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 5d9f5fbe6 [Improve] get flink config minor improvement
5d9f5fbe6 is described below
commit 5d9f5fbe6be65b25612002905439e7ebbf2a386b
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 15 20:44:34 2024 +0800
[Improve] get flink config minor improvement
---
.../flink/client/bean/SubmitRequest.scala | 32 +++++++++++++---------
.../impl/KubernetesNativeSessionClient.scala | 3 ++
.../flink/client/impl/YarnSessionClient.scala | 2 ++
.../flink/client/trait/FlinkClientTrait.scala | 14 ++++++++++
.../flink/core/FlinkStreamTableTrait.scala | 2 +-
.../apache/streampark/flink/cli/SqlClient.scala | 2 +-
6 files changed, 40 insertions(+), 15 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 49854e74e..4cdcf07c1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -102,19 +102,25 @@ case class SubmitRequest(
lazy val flinkDefaultConfiguration: Configuration = {
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf"))
match {
case Success(value) =>
- value
- .keySet()
- .foreach(
- k => {
- val v = value.getString(k, null)
- if (v != null) {
- val result = v
- .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)",
effectiveAppName)
- .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
- value.setString(k, result)
- }
- })
- value
+ executionMode match {
+ case ExecutionMode.YARN_SESSION |
ExecutionMode.KUBERNETES_NATIVE_SESSION |
+ ExecutionMode.REMOTE =>
+ value
+ case _ =>
+ value
+ .keySet()
+ .foreach(
+ k => {
+ val v = value.getString(k, null)
+ if (v != null) {
+ val result = v
+ .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)",
effectiveAppName)
+ .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)",
id.toString)
+ value.setString(k, result)
+ }
+ })
+ value
+ }
case _ => new Configuration()
}
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 51e6fd63d..4c3cf5712 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -150,6 +150,9 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
|""".stripMargin)
val flinkConfig = getFlinkK8sConfig(deployRequest)
+
+ replaceConfig(flinkConfig, "\\$\\{job(Name|name)}|\\$job(Name|name)",
deployRequest.clusterName)
+
val kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
var clusterDescriptor: KubernetesClusterDescriptor = null
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index eb49ad4a1..76d86b8b6 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -77,6 +77,8 @@ object YarnSessionClient extends YarnClientTrait {
// app name
.safeSet(YarnConfigOptions.APPLICATION_NAME, deployRequest.clusterName)
+ replaceConfig(flinkConfig, "\\$\\{job(Name|name)}|\\$job(Name|name)",
deployRequest.clusterName)
+
logInfo(s"""
|------------------------------------------------------------------
|Effective submit configuration: $flinkConfig
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 896767ca3..d741df50f 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -496,4 +496,18 @@ trait FlinkClientTrait extends Logger {
})
}
+ def replaceConfig(flinkConfig: Configuration, regexp: String, replacement:
String): Unit = {
+ flinkConfig
+ .keySet()
+ .foreach(
+ k => {
+ val v = flinkConfig.getString(k, null)
+ if (v != null) {
+ val result = v
+ .replaceAll(regexp, replacement)
+ flinkConfig.setString(k, result)
+ }
+ })
+ }
+
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index b0a746005..47acb23ea 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -72,7 +72,7 @@ abstract class FlinkStreamTableTrait(
/** Recommended to use this Api to start tasks */
def start(name: String = null): JobExecutionResult = {
- val appName = parameter.getAppName(name, true)
+ val appName = parameter.getAppName(name, required = true)
execute(appName)
}
diff --git
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index c80d520f9..5410e026a 100644
---
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -53,7 +53,7 @@ object SqlClient extends App {
private[this] val mode = sets.find(_.operands.head ==
ExecutionOptions.RUNTIME_MODE.key()) match {
case Some(e) =>
- // 1) flink sql execution.runtime-mode has highest priority
+ // 1) flink sql execution.runtime-mode has the highest priority
val m = e.operands(1).toUpperCase()
arguments += s"--${ExecutionOptions.RUNTIME_MODE.key()} $m"
m