This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new aaf7c7422 Support the ${jobName} variable in Flink configuration.
(#4062)
aaf7c7422 is described below
commit aaf7c7422300cac274ad370ff14c0eda0e05ddf6
Author: Darcy <[email protected]>
AuthorDate: Sun Sep 15 11:43:28 2024 +0800
Support the ${jobName} variable in Flink configuration. (#4062)
* feature: support jobName var in flink conf
* fix issue
---------
Co-authored-by: benjobs <[email protected]>
---
.../streampark/common/utils/CommonUtils.java | 26 ++++++++++++++++++++++
.../streampark/console/core/entity/FlinkEnv.java | 8 +++++--
.../core/service/impl/ApplicationServiceImpl.java | 2 +-
.../core/service/impl/SavepointServiceImpl.java | 5 +++--
.../console/core/task/FlinkK8sWatcherWrapper.java | 2 +-
.../impl/KubernetesNativeSessionClient.scala | 1 -
.../flink/client/trait/FlinkClientTrait.scala | 20 +++++++++++++----
7 files changed, 53 insertions(+), 11 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
b/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
new file mode 100644
index 000000000..eb022491c
--- /dev/null
+++
b/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.utils;
+
+public class CommonUtils {
+ private CommonUtils() {}
+
+ public static String fixedValueBaseVar(String configValue, String jobName) {
+ return configValue.replace("${jobName}", jobName);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index b1ed53da8..4662aeb6e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.utils.CommonUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
@@ -115,11 +116,14 @@ public class FlinkEnv implements Serializable {
}
@JsonIgnore
- public Properties getFlinkConfig() {
+ public Properties getFlinkConfig(Application application) {
String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
Properties flinkConfig = new Properties();
Map<String, String> config =
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
- flinkConfig.putAll(config);
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ String value = CommonUtils.fixedValueBaseVar(entry.getValue(),
application.getJobName());
+ flinkConfig.setProperty(entry.getKey(), value);
+ }
return flinkConfig;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 2062baee3..881706aca 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1820,7 +1820,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String archiveDir =
-
flinkEnv.getFlinkConfig().getProperty(JobManagerOptions.ARCHIVE_DIR.key());
+
flinkEnv.getFlinkConfig(application).getProperty(JobManagerOptions.ARCHIVE_DIR.key());
if (archiveDir != null) {
properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index a1455b281..ebefe6197 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -155,7 +155,8 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
}
if (cpThreshold == 0) {
- String flinkConfNumRetained =
flinkEnv.getFlinkConfig().getProperty(numRetainedKey);
+ String flinkConfNumRetained =
+ flinkEnv.getFlinkConfig(application).getProperty(numRetainedKey);
int numRetainedDefaultValue = 1;
if (flinkConfNumRetained != null) {
try {
@@ -292,7 +293,7 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
if (StringUtils.isBlank(savepointPath)) {
// flink
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
- Properties flinkConfig = flinkEnv.getFlinkConfig();
+ Properties flinkConfig = flinkEnv.getFlinkConfig(application);
savepointPath =
flinkConfig.getProperty(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index c449b4e21..a09f85ef9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -119,7 +119,7 @@ public class FlinkK8sWatcherWrapper {
public TrackId toTrackId(Application app) {
FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
- Properties properties = flinkEnv.getFlinkConfig();
+ Properties properties = flinkEnv.getFlinkConfig(app);
Map<String, String> dynamicProperties =
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 4bd37db9a..51e6fd63d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -27,7 +27,6 @@ import
org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.model.ClusterKey
-import io.fabric8.kubernetes.api.model.{Config => _}
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 675b0ea08..ff7c7304c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -376,7 +376,12 @@ trait FlinkClientTrait extends Logger {
commandLine)
val configuration =
- applyConfiguration(submitRequest.flinkVersion.flinkHome,
activeCommandLine, commandLine)
+ applyConfiguration(
+ submitRequest.flinkVersion.flinkHome,
+ activeCommandLine,
+ commandLine,
+ submitRequest.id.toString,
+ submitRequest.effectiveAppName)
commandLine -> configuration
@@ -443,7 +448,9 @@ trait FlinkClientTrait extends Logger {
private[this] def applyConfiguration(
flinkHome: String,
activeCustomCommandLine: CustomCommandLine,
- commandLine: CommandLine): Configuration = {
+ commandLine: CommandLine,
+ jobId: String = null,
+ jobName: String = null): Configuration = {
require(activeCustomCommandLine != null, "activeCustomCommandLine must not
be null.")
val configuration = new Configuration()
@@ -451,9 +458,14 @@ trait FlinkClientTrait extends Logger {
flinkDefaultConfiguration.keySet.foreach(
key => {
val value = flinkDefaultConfiguration.getString(key, null)
- if (value != null) {
- configuration.setString(key, value)
+ var result = value
+ if (value != null && StringUtils.isNotBlank(jobName)) {
+ result = value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)",
jobName)
}
+ if (jobId != null) {
+ result = result.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId)
+ }
+ configuration.setString(key, result)
})
configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
configuration