This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8df912aa7a [Improvement-16746][seatunnel] pass user defined task 
parameter to seatunnel (#16756)
8df912aa7a is described below

commit 8df912aa7a4a0fbad38245cf001a65ac2cff3c47
Author: Jarvis <[email protected]>
AuthorDate: Tue Nov 5 17:23:30 2024 +0800

    [Improvement-16746][seatunnel] pass user defined task parameter to 
seatunnel (#16756)
---
 docs/docs/en/guide/task/seatunnel.md               | 11 ++++---
 docs/docs/zh/guide/task/seatunnel.md               |  5 +++-
 .../plugin/task/seatunnel/SeatunnelTask.java       | 28 ++++++++++++++++++
 .../plugin/task/seatunnel/SeatunnelTaskTest.java   | 34 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 7 deletions(-)

diff --git a/docs/docs/en/guide/task/seatunnel.md 
b/docs/docs/en/guide/task/seatunnel.md
index a622ab49ef..52eec581ca 100644
--- a/docs/docs/en/guide/task/seatunnel.md
+++ b/docs/docs/en/guide/task/seatunnel.md
@@ -26,17 +26,16 @@ Click [here](https://seatunnel.apache.org/) for more 
information about `Apache S
 - SEATUNNEL_ENGINE
 - Deployment mode: specify the deployment mode, `cluster` `local`
 
-          > Click 
[here](https://seatunnel.apache.org/docs/2.3.3/command/usage) for more 
information on the usage of 
-
-`Apache SeaTunnel command`
+  > Click [here](https://seatunnel.apache.org/docs/2.3.3/command/usage) for 
more information on the usage of Apache SeaTunnel command`
 
 - Custom Configuration: Supports custom configuration or select configuration 
file from Resource Center
 
-  > Click [here](https://seatunnel.apache.org/docs/2.3.3/concept/config) for 
more information about `Apache
-  >
-  >> SeaTunnel config` file
+  > Click [here](https://seatunnel.apache.org/docs/2.3.3/concept/config) for 
more information about `Apache SeaTunnel config` file
 
 - Script: Customize configuration information on the task node, including four 
parts: `env` `source` `transform` `sink`
+- Custom Parameters/Global Parameters: When custom parameters/global 
parameters are defined, the parameters will be passed to the SeaTunnel task, 
and the parameter value can be dynamically replaced during task execution by 
referencing the parameter with `${}` in the SeaTunnel task.
+
+  > Click 
[here](https://seatunnel.apache.org/docs/concept/config/#config-variable-substitution)
 for more information on `Apache SeaTunnel variable substitution`
 
 ## Task Example
 
diff --git a/docs/docs/zh/guide/task/seatunnel.md 
b/docs/docs/zh/guide/task/seatunnel.md
index 48d0175f3e..4d640553fc 100644
--- a/docs/docs/zh/guide/task/seatunnel.md
+++ b/docs/docs/zh/guide/task/seatunnel.md
@@ -26,13 +26,16 @@
 - SEATUNNEL_ENGINE
 - 部署方式:指定部署模式,`cluster` `local`
 
-          > 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/command/usage) 
获取更多关于`Apache SeaTunnel command` 使用的信息
+  > 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/command/usage) 
获取更多关于`Apache SeaTunnel command` 使用的信息
 
 - 自定义配置:支持自定义配置或从资源中心选择配置文件
 
   > 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/concept/config) 
获取更多关于`Apache SeaTunnel config` 文件介绍
 
 - 脚本:在任务节点那自定义配置信息,包括四部分:`env` `source` `transform` `sink`
+- 自定义参数/全局参数: 当定义了自定义参数/全局参数时, 会将该参数传递给SeaTunnel任务, 
可以在SeaTunnel任务中通过`${}`引用该参数, 从而在任务运行时动态替换参数值.
+
+  > 点击 
[这里](https://seatunnel.apache.org/docs/concept/config/#config-variable-substitution)
 获取更多关于`Apache SeaTunnel 变量替换` 使用的信息
 
 ## 任务样例
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index b5b85427f7..da9efbfec0 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -27,6 +27,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -45,6 +46,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -169,9 +171,35 @@ public class SeatunnelTask extends AbstractRemoteTask {
         String filePath = buildConfigFilePath();
         createConfigFileIfNotExists(scriptContent, filePath);
         args.add(filePath);
+        args.addAll(generateTaskParameters());
         return args;
     }
 
+    private List<String> generateTaskParameters() {
+        Map<String, String> variables = new HashMap<>();
+        Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
+        List<Property> propertyList = 
JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
+        if (propertyList != null && !propertyList.isEmpty()) {
+            for (Property property : propertyList) {
+                variables.put(property.getProp(), 
paramsMap.get(property.getProp()).getValue());
+            }
+        }
+        List<Property> localParams = this.seatunnelParameters.getLocalParams();
+        if (localParams != null && !localParams.isEmpty()) {
+            for (Property property : localParams) {
+                if (property.getDirect().equals(Direct.IN)) {
+                    variables.put(property.getProp(), 
paramsMap.get(property.getProp()).getValue());
+                }
+            }
+        }
+        List<String> parameters = new ArrayList<>();
+        variables.forEach((k, v) -> {
+            parameters.add("-i");
+            parameters.add(String.format("%s='%s'", k, v));
+        });
+        return parameters;
+    }
+
     private String buildCustomConfigContent() {
         log.info("raw custom config content : {}", 
seatunnelParameters.getRawScript());
         String script = 
seatunnelParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
index 11fffedd80..3268f6ee75 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
@@ -19,12 +19,17 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
 
 import org.apache.commons.io.FileUtils;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -99,6 +104,35 @@ public class SeatunnelTaskTest {
         Assertions.assertEquals(expectedCommand, command);
     }
 
+    @Test
+    public void testParameterPass() throws Exception {
+        String taskId = "3456";
+        SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
+        seatunnelParameters.setUseCustom(false);
+        ResourceInfo resourceInfo = new ResourceInfo();
+        resourceInfo.setResourceName(RESOURCE_SCRIPT_PATH);
+        List<Property> localParam = new ArrayList<>();
+        Property property = new Property("key1", Direct.IN, DataType.VARCHAR, 
"value1");
+        localParam.add(property);
+        seatunnelParameters.setLocalParams(localParam);
+        
seatunnelParameters.setResourceList(Collections.singletonList(resourceInfo));
+
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setExecutePath(EXECUTE_PATH);
+        taskExecutionContext.setTaskAppId(taskId);
+        
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
+        ResourceContext resourceContext = new ResourceContext();
+        resourceContext.addResourceItem(new 
ResourceContext.ResourceItem(RESOURCE_SCRIPT_PATH, RESOURCE_SCRIPT_PATH));
+        taskExecutionContext.setResourceContext(resourceContext);
+        
taskExecutionContext.setPrepareParamsMap(Collections.singletonMap("key1", 
property));
+
+        SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
+        seatunnelTask.setSeatunnelParameters(seatunnelParameters);
+        String command = String.join(" ", seatunnelTask.buildOptions());
+        String expectedCommand = String.format("--config %s/seatunnel_%s.conf 
-i key1='value1'", EXECUTE_PATH, taskId);
+        Assertions.assertEquals(expectedCommand, command);
+    }
+
     private static final String RAW_SCRIPT = "env {\n" +
             "  execution.parallelism = 2\n" +
             "  job.mode = \"BATCH\"\n" +

Reply via email to