This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0a00aa44e5 [Improvement-17986][task-plugin] Support parameter
replacement in Flink and FlinkStream task (#17987)
0a00aa44e5 is described below
commit 0a00aa44e5f3641515c3955f93c851e8bf9b8d4b
Author: macdoor <[email protected]>
AuthorDate: Sun Mar 1 18:02:51 2026 +0800
[Improvement-17986][task-plugin] Support parameter replacement in Flink and
FlinkStream task (#17987)
---
.../plugin/task/flink/FlinkStreamTask.java | 12 +--
.../plugin/task/flink/FlinkStreamTaskTest.java | 110 +++++++++++++++++++++
.../plugin/task/flink/FlinkTask.java | 34 ++++++-
.../plugin/task/flink/FlinkTaskTest.java | 107 ++++++++++++++++++++
4 files changed, 248 insertions(+), 15 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index 3c2c1e046b..a9ce0fe4d8 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -28,7 +28,6 @@ import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.util.List;
-import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -53,20 +52,11 @@ public class FlinkStreamTask extends FlinkTask implements
StreamTask {
if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
}
-
- FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
- /**
- * create command
- *
- * @return command
- */
@Override
protected String getScript() {
- // flink run/run-application [OPTIONS] <jar-file> <arguments>
- List<String> args =
FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
- return args.stream().collect(Collectors.joining(" "));
+ return buildScriptWithParameterReplacement(flinkParameters);
}
@Override
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTaskTest.java
new file mode 100644
index 0000000000..46fd73ada2
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTaskTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import static
org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class FlinkStreamTaskTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testParameterReplacementInScript() throws Exception {
+ String executePath = tempDir.toString();
+ String taskAppId = "test-app";
+
+ FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
+ flinkParameters.setProgramType(ProgramType.SQL);
+ flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
+ flinkParameters.setParallelism(2);
+ flinkParameters.setInitScript("SET 'date' = '${bizdate}';");
+ flinkParameters.setRawScript("SELECT * FROM logs WHERE dt =
'${bizdate}' AND env = '${env}'");
+
+ Map<String, Property> prepareParamsMap = new HashMap<>();
+ prepareParamsMap.put("bizdate", new Property("bizdate", null, null,
"20250601"));
+ prepareParamsMap.put("env", new Property("env", null, null, "prod"));
+
+ TaskExecutionContext context = new TaskExecutionContext();
+ context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
+ context.setExecutePath(executePath);
+ context.setTaskAppId(taskAppId);
+ context.setPrepareParamsMap(prepareParamsMap);
+
+ FlinkStreamTask task = new FlinkStreamTask(context);
+ task.init();
+ task.getScript();
+
+ String initScriptPath = String.format("%s/%s_init.sql", executePath,
taskAppId);
+ String nodeScriptPath = String.format("%s/%s_node.sql", executePath,
taskAppId);
+
+ String initContent = new
String(Files.readAllBytes(Paths.get(initScriptPath)), StandardCharsets.UTF_8);
+ String nodeContent = new
String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
+
+ String expectedInitOptions =
String.join(FlinkConstants.FLINK_SQL_NEWLINE,
+
FlinkArgsUtils.buildInitOptionsForSql(flinkParameters)).concat(FlinkConstants.FLINK_SQL_NEWLINE);
+ Assertions.assertEquals(expectedInitOptions + "SET 'date' =
'20250601';", initContent);
+ Assertions.assertEquals("SELECT * FROM logs WHERE dt = '20250601' AND
env = 'prod'", nodeContent.trim());
+ }
+
+ @Test
+ public void testParameterReplacementTimePlaceholder() throws Exception {
+ String executePath = tempDir.toString();
+ String taskAppId = "test-time";
+
+ FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
+ flinkParameters.setProgramType(ProgramType.SQL);
+ flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
+ flinkParameters.setParallelism(2);
+ flinkParameters.setInitScript("");
+ flinkParameters.setRawScript("INSERT INTO t SELECT * FROM s WHERE dt =
'$[yyyyMMdd]'");
+
+ Map<String, Property> prepareParamsMap = new HashMap<>();
+ prepareParamsMap.put(PARAMETER_DATETIME, new
Property(PARAMETER_DATETIME, null, null, "20210815080000"));
+
+ TaskExecutionContext context = new TaskExecutionContext();
+ context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
+ context.setExecutePath(executePath);
+ context.setTaskAppId(taskAppId);
+ context.setPrepareParamsMap(prepareParamsMap);
+
+ FlinkStreamTask task = new FlinkStreamTask(context);
+ task.init();
+ task.getScript();
+
+ String nodeScriptPath = String.format("%s/%s_node.sql", executePath,
taskAppId);
+ String nodeContent = new
String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
+
+ Assertions.assertEquals("INSERT INTO t SELECT * FROM s WHERE dt =
'20210815'", nodeContent.trim());
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 4b3d9f98e1..6f49ea75c5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -21,9 +21,14 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.lang3.StringUtils;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -62,8 +67,6 @@ public class FlinkTask extends AbstractYarnTask {
if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
}
-
- FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
/**
@@ -73,8 +76,31 @@ public class FlinkTask extends AbstractYarnTask {
*/
@Override
protected String getScript() {
- // flink run/run-application [OPTIONS] <jar-file> <arguments>
- List<String> args =
FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
+ return buildScriptWithParameterReplacement(flinkParameters);
+ }
+
+ /**
+ * Apply parameter replacement to initScript/rawScript, generate script
files and build run command.
+ *
+ * @param params flink parameters
+ * @return run command string
+ */
+ protected String buildScriptWithParameterReplacement(FlinkParameters
params) {
+ Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
+ Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
+
+ if (StringUtils.isNotBlank(params.getInitScript())) {
+ params.setInitScript(
+
ParameterUtils.convertParameterPlaceholders(params.getInitScript(),
stringParams));
+ }
+ if (StringUtils.isNotBlank(params.getRawScript())) {
+ params.setRawScript(
+
ParameterUtils.convertParameterPlaceholders(params.getRawScript(),
stringParams));
+ }
+
+ FileUtils.generateScriptFile(taskExecutionContext, params);
+
+ List<String> args =
FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, params);
return args.stream().collect(Collectors.joining(" "));
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
new file mode 100644
index 0000000000..5fea3294c1
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import static
org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class FlinkTaskTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testParameterReplacementInScript() throws Exception {
+ String executePath = tempDir.toString();
+ String taskAppId = "test-app";
+
+ FlinkParameters flinkParameters = new FlinkParameters();
+ flinkParameters.setProgramType(ProgramType.SQL);
+ flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
+ flinkParameters.setParallelism(2);
+ flinkParameters.setInitScript("set batch_size=${batch_size};");
+ flinkParameters.setRawScript("SELECT * FROM logs WHERE
dt='$[yyyyMMdd]';");
+
+ Map<String, Property> prepareParamsMap = new HashMap<>();
+ prepareParamsMap.put("batch_size", new Property("batch_size", null,
null, "1000"));
+ prepareParamsMap.put(PARAMETER_DATETIME, new
Property(PARAMETER_DATETIME, null, null, "20201201120000"));
+
+ TaskExecutionContext context = new TaskExecutionContext();
+ context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
+ context.setExecutePath(executePath);
+ context.setTaskAppId(taskAppId);
+ context.setPrepareParamsMap(prepareParamsMap);
+
+ FlinkTask task = new FlinkTask(context);
+ task.init();
+ task.getScript();
+
+ String initScriptPath = String.format("%s/%s_init.sql", executePath,
taskAppId);
+ String nodeScriptPath = String.format("%s/%s_node.sql", executePath,
taskAppId);
+
+ String initContent = new
String(Files.readAllBytes(Paths.get(initScriptPath)), StandardCharsets.UTF_8);
+ String nodeContent = new
String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
+
+ String expectedInitOptions =
String.join(FlinkConstants.FLINK_SQL_NEWLINE,
+
FlinkArgsUtils.buildInitOptionsForSql(flinkParameters)).concat(FlinkConstants.FLINK_SQL_NEWLINE);
+ Assertions.assertEquals(expectedInitOptions + "set batch_size=1000;",
initContent);
+ Assertions.assertEquals("SELECT * FROM logs WHERE dt='20201201';",
nodeContent.trim());
+ }
+
+ @Test
+ public void testParameterReplacementWithNullParamsMap() throws Exception {
+ String executePath = tempDir.toString();
+ String taskAppId = "test-null-params";
+
+ FlinkParameters flinkParameters = new FlinkParameters();
+ flinkParameters.setProgramType(ProgramType.SQL);
+ flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
+ flinkParameters.setParallelism(2);
+ flinkParameters.setInitScript("");
+ flinkParameters.setRawScript("SELECT 1;");
+
+ TaskExecutionContext context = new TaskExecutionContext();
+ context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
+ context.setExecutePath(executePath);
+ context.setTaskAppId(taskAppId);
+ context.setPrepareParamsMap(null);
+
+ FlinkTask task = new FlinkTask(context);
+ task.init();
+ String script = task.getScript();
+
+ String nodeScriptPath = String.format("%s/%s_node.sql", executePath,
taskAppId);
+ String nodeContent = new
String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
+ Assertions.assertEquals("SELECT 1;", nodeContent.trim());
+ Assertions.assertNotNull(script);
+ }
+}