This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0a00aa44e5 [Improvement-17986][task-plugin] Support parameter 
replacement in Flink and FlinkStream task (#17987)
0a00aa44e5 is described below

commit 0a00aa44e5f3641515c3955f93c851e8bf9b8d4b
Author: macdoor <[email protected]>
AuthorDate: Sun Mar 1 18:02:51 2026 +0800

    [Improvement-17986][task-plugin] Support parameter replacement in Flink and 
FlinkStream task (#17987)
---
 .../plugin/task/flink/FlinkStreamTask.java         |  12 +--
 .../plugin/task/flink/FlinkStreamTaskTest.java     | 110 +++++++++++++++++++++
 .../plugin/task/flink/FlinkTask.java               |  34 ++++++-
 .../plugin/task/flink/FlinkTaskTest.java           | 107 ++++++++++++++++++++
 4 files changed, 248 insertions(+), 15 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index 3c2c1e046b..a9ce0fe4d8 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -28,7 +28,6 @@ import org.apache.commons.collections4.CollectionUtils;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -53,20 +52,11 @@ public class FlinkStreamTask extends FlinkTask implements 
StreamTask {
         if (flinkParameters == null || !flinkParameters.checkParameters()) {
             throw new RuntimeException("flink task params is not valid");
         }
-
-        FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
     }
 
-    /**
-     * create command
-     *
-     * @return command
-     */
     @Override
     protected String getScript() {
-        // flink run/run-application [OPTIONS] <jar-file> <arguments>
-        List<String> args = 
FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
-        return args.stream().collect(Collectors.joining(" "));
+        return buildScriptWithParameterReplacement(flinkParameters);
     }
 
     @Override
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTaskTest.java
new file mode 100644
index 0000000000..46fd73ada2
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTaskTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import static 
org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class FlinkStreamTaskTest {
+
+    @TempDir
+    Path tempDir;
+
+    @Test
+    public void testParameterReplacementInScript() throws Exception {
+        String executePath = tempDir.toString();
+        String taskAppId = "test-app";
+
+        FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
+        flinkParameters.setProgramType(ProgramType.SQL);
+        flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
+        flinkParameters.setParallelism(2);
+        flinkParameters.setInitScript("SET 'date' = '${bizdate}';");
+        flinkParameters.setRawScript("SELECT * FROM logs WHERE dt = 
'${bizdate}' AND env = '${env}'");
+
+        Map<String, Property> prepareParamsMap = new HashMap<>();
+        prepareParamsMap.put("bizdate", new Property("bizdate", null, null, 
"20250601"));
+        prepareParamsMap.put("env", new Property("env", null, null, "prod"));
+
+        TaskExecutionContext context = new TaskExecutionContext();
+        context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
+        context.setExecutePath(executePath);
+        context.setTaskAppId(taskAppId);
+        context.setPrepareParamsMap(prepareParamsMap);
+
+        FlinkStreamTask task = new FlinkStreamTask(context);
+        task.init();
+        task.getScript();
+
+        String initScriptPath = String.format("%s/%s_init.sql", executePath, 
taskAppId);
+        String nodeScriptPath = String.format("%s/%s_node.sql", executePath, 
taskAppId);
+
+        String initContent = new 
String(Files.readAllBytes(Paths.get(initScriptPath)), StandardCharsets.UTF_8);
+        String nodeContent = new 
String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
+
+        String expectedInitOptions = 
String.join(FlinkConstants.FLINK_SQL_NEWLINE,
+                
FlinkArgsUtils.buildInitOptionsForSql(flinkParameters)).concat(FlinkConstants.FLINK_SQL_NEWLINE);
+        Assertions.assertEquals(expectedInitOptions + "SET 'date' = 
'20250601';", initContent);
+        Assertions.assertEquals("SELECT * FROM logs WHERE dt = '20250601' AND 
env = 'prod'", nodeContent.trim());
+    }
+
+    @Test
+    public void testParameterReplacementTimePlaceholder() throws Exception {
+        String executePath = tempDir.toString();
+        String taskAppId = "test-time";
+
+        FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
+        flinkParameters.setProgramType(ProgramType.SQL);
+        flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
+        flinkParameters.setParallelism(2);
+        flinkParameters.setInitScript("");
+        flinkParameters.setRawScript("INSERT INTO t SELECT * FROM s WHERE dt = 
'$[yyyyMMdd]'");
+
+        Map<String, Property> prepareParamsMap = new HashMap<>();
+        prepareParamsMap.put(PARAMETER_DATETIME, new 
Property(PARAMETER_DATETIME, null, null, "20210815080000"));
+
+        TaskExecutionContext context = new TaskExecutionContext();
+        context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
+        context.setExecutePath(executePath);
+        context.setTaskAppId(taskAppId);
+        context.setPrepareParamsMap(prepareParamsMap);
+
+        FlinkStreamTask task = new FlinkStreamTask(context);
+        task.init();
+        task.getScript();
+
+        String nodeScriptPath = String.format("%s/%s_node.sql", executePath, 
taskAppId);
+        String nodeContent = new 
String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
+
+        Assertions.assertEquals("INSERT INTO t SELECT * FROM s WHERE dt = 
'20210815'", nodeContent.trim());
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 4b3d9f98e1..6f49ea75c5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -21,9 +21,14 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -62,8 +67,6 @@ public class FlinkTask extends AbstractYarnTask {
         if (flinkParameters == null || !flinkParameters.checkParameters()) {
             throw new RuntimeException("flink task params is not valid");
         }
-
-        FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
     }
 
     /**
@@ -73,8 +76,31 @@ public class FlinkTask extends AbstractYarnTask {
      */
     @Override
     protected String getScript() {
-        // flink run/run-application [OPTIONS] <jar-file> <arguments>
-        List<String> args = 
FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
+        return buildScriptWithParameterReplacement(flinkParameters);
+    }
+
+    /**
+     * Apply parameter replacement to initScript/rawScript, generate script 
files and build run command.
+     *
+     * @param params flink parameters
+     * @return run command string
+     */
+    protected String buildScriptWithParameterReplacement(FlinkParameters 
params) {
+        Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
+        Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
+
+        if (StringUtils.isNotBlank(params.getInitScript())) {
+            params.setInitScript(
+                    
ParameterUtils.convertParameterPlaceholders(params.getInitScript(), 
stringParams));
+        }
+        if (StringUtils.isNotBlank(params.getRawScript())) {
+            params.setRawScript(
+                    
ParameterUtils.convertParameterPlaceholders(params.getRawScript(), 
stringParams));
+        }
+
+        FileUtils.generateScriptFile(taskExecutionContext, params);
+
+        List<String> args = 
FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, params);
         return args.stream().collect(Collectors.joining(" "));
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
new file mode 100644
index 0000000000..5fea3294c1
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import static 
org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class FlinkTaskTest {
+
+    @TempDir
+    Path tempDir;
+
+    @Test
+    public void testParameterReplacementInScript() throws Exception {
+        String executePath = tempDir.toString();
+        String taskAppId = "test-app";
+
+        FlinkParameters flinkParameters = new FlinkParameters();
+        flinkParameters.setProgramType(ProgramType.SQL);
+        flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
+        flinkParameters.setParallelism(2);
+        flinkParameters.setInitScript("set batch_size=${batch_size};");
+        flinkParameters.setRawScript("SELECT * FROM logs WHERE 
dt='$[yyyyMMdd]';");
+
+        Map<String, Property> prepareParamsMap = new HashMap<>();
+        prepareParamsMap.put("batch_size", new Property("batch_size", null, 
null, "1000"));
+        prepareParamsMap.put(PARAMETER_DATETIME, new 
Property(PARAMETER_DATETIME, null, null, "20201201120000"));
+
+        TaskExecutionContext context = new TaskExecutionContext();
+        context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
+        context.setExecutePath(executePath);
+        context.setTaskAppId(taskAppId);
+        context.setPrepareParamsMap(prepareParamsMap);
+
+        FlinkTask task = new FlinkTask(context);
+        task.init();
+        task.getScript();
+
+        String initScriptPath = String.format("%s/%s_init.sql", executePath, 
taskAppId);
+        String nodeScriptPath = String.format("%s/%s_node.sql", executePath, 
taskAppId);
+
+        String initContent = new 
String(Files.readAllBytes(Paths.get(initScriptPath)), StandardCharsets.UTF_8);
+        String nodeContent = new 
String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
+
+        String expectedInitOptions = 
String.join(FlinkConstants.FLINK_SQL_NEWLINE,
+                
FlinkArgsUtils.buildInitOptionsForSql(flinkParameters)).concat(FlinkConstants.FLINK_SQL_NEWLINE);
+        Assertions.assertEquals(expectedInitOptions + "set batch_size=1000;", 
initContent);
+        Assertions.assertEquals("SELECT * FROM logs WHERE dt='20201201';", 
nodeContent.trim());
+    }
+
+    @Test
+    public void testParameterReplacementWithNullParamsMap() throws Exception {
+        String executePath = tempDir.toString();
+        String taskAppId = "test-null-params";
+
+        FlinkParameters flinkParameters = new FlinkParameters();
+        flinkParameters.setProgramType(ProgramType.SQL);
+        flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
+        flinkParameters.setParallelism(2);
+        flinkParameters.setInitScript("");
+        flinkParameters.setRawScript("SELECT 1;");
+
+        TaskExecutionContext context = new TaskExecutionContext();
+        context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
+        context.setExecutePath(executePath);
+        context.setTaskAppId(taskAppId);
+        context.setPrepareParamsMap(null);
+
+        FlinkTask task = new FlinkTask(context);
+        task.init();
+        String script = task.getScript();
+
+        String nodeScriptPath = String.format("%s/%s_node.sql", executePath, 
taskAppId);
+        String nodeContent = new 
String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
+        Assertions.assertEquals("SELECT 1;", nodeContent.trim());
+        Assertions.assertNotNull(script);
+    }
+}

Reply via email to