This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8df912aa7a [Improvement-16746][seatunnel] pass user defined task
parameter to seatunnel (#16756)
8df912aa7a is described below
commit 8df912aa7a4a0fbad38245cf001a65ac2cff3c47
Author: Jarvis <[email protected]>
AuthorDate: Tue Nov 5 17:23:30 2024 +0800
[Improvement-16746][seatunnel] pass user defined task parameter to
seatunnel (#16756)
---
docs/docs/en/guide/task/seatunnel.md | 11 ++++---
docs/docs/zh/guide/task/seatunnel.md | 5 +++-
.../plugin/task/seatunnel/SeatunnelTask.java | 28 ++++++++++++++++++
.../plugin/task/seatunnel/SeatunnelTaskTest.java | 34 ++++++++++++++++++++++
4 files changed, 71 insertions(+), 7 deletions(-)
diff --git a/docs/docs/en/guide/task/seatunnel.md
b/docs/docs/en/guide/task/seatunnel.md
index a622ab49ef..52eec581ca 100644
--- a/docs/docs/en/guide/task/seatunnel.md
+++ b/docs/docs/en/guide/task/seatunnel.md
@@ -26,17 +26,16 @@ Click [here](https://seatunnel.apache.org/) for more
information about `Apache S
- SEATUNNEL_ENGINE
- Deployment mode: specify the deployment mode, `cluster` `local`
- > Click
[here](https://seatunnel.apache.org/docs/2.3.3/command/usage) for more
information on the usage of
-
-`Apache SeaTunnel command`
+ > Click [here](https://seatunnel.apache.org/docs/2.3.3/command/usage) for
more information on the usage of Apache SeaTunnel command`
- Custom Configuration: Supports custom configuration or select configuration
file from Resource Center
- > Click [here](https://seatunnel.apache.org/docs/2.3.3/concept/config) for
more information about `Apache
- >
- >> SeaTunnel config` file
+ > Click [here](https://seatunnel.apache.org/docs/2.3.3/concept/config) for
more information about `Apache SeaTunnel config` file
- Script: Customize configuration information on the task node, including four
parts: `env` `source` `transform` `sink`
+- Custom Parameters/Global Parameters: When custom parameters/global
parameters are defined, the parameters will be passed to the SeaTunnel task,
and the parameter value can be dynamically replaced during task execution by
referencing the parameter with `${}` in the SeaTunnel task.
+
+ > Click
[here](https://seatunnel.apache.org/docs/concept/config/#config-variable-substitution)
for more information on `Apache SeaTunnel variable substitution`
## Task Example
diff --git a/docs/docs/zh/guide/task/seatunnel.md
b/docs/docs/zh/guide/task/seatunnel.md
index 48d0175f3e..4d640553fc 100644
--- a/docs/docs/zh/guide/task/seatunnel.md
+++ b/docs/docs/zh/guide/task/seatunnel.md
@@ -26,13 +26,16 @@
- SEATUNNEL_ENGINE
- 部署方式:指定部署模式,`cluster` `local`
- > 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/command/usage)
获取更多关于`Apache SeaTunnel command` 使用的信息
+ > 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/command/usage)
获取更多关于`Apache SeaTunnel command` 使用的信息
- 自定义配置:支持自定义配置或从资源中心选择配置文件
> 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/concept/config)
获取更多关于`Apache SeaTunnel config` 文件介绍
- 脚本:在任务节点那自定义配置信息,包括四部分:`env` `source` `transform` `sink`
+- 自定义参数/全局参数: 当定义了自定义参数/全局参数时, 会将该参数传递给SeaTunnel任务,
可以在SeaTunnel任务中通过`${}`引用该参数, 从而在任务运行时动态替换参数值.
+
+ > 点击
[这里](https://seatunnel.apache.org/docs/concept/config/#config-variable-substitution)
获取更多关于`Apache SeaTunnel 变量替换` 使用的信息
## 任务样例
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index b5b85427f7..da9efbfec0 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -27,6 +27,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -45,6 +46,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -169,9 +171,35 @@ public class SeatunnelTask extends AbstractRemoteTask {
String filePath = buildConfigFilePath();
createConfigFileIfNotExists(scriptContent, filePath);
args.add(filePath);
+ args.addAll(generateTaskParameters());
return args;
}
+ private List<String> generateTaskParameters() {
+ Map<String, String> variables = new HashMap<>();
+ Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
+ List<Property> propertyList =
JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
+ if (propertyList != null && !propertyList.isEmpty()) {
+ for (Property property : propertyList) {
+ variables.put(property.getProp(),
paramsMap.get(property.getProp()).getValue());
+ }
+ }
+ List<Property> localParams = this.seatunnelParameters.getLocalParams();
+ if (localParams != null && !localParams.isEmpty()) {
+ for (Property property : localParams) {
+ if (property.getDirect().equals(Direct.IN)) {
+ variables.put(property.getProp(),
paramsMap.get(property.getProp()).getValue());
+ }
+ }
+ }
+ List<String> parameters = new ArrayList<>();
+ variables.forEach((k, v) -> {
+ parameters.add("-i");
+ parameters.add(String.format("%s='%s'", k, v));
+ });
+ return parameters;
+ }
+
private String buildCustomConfigContent() {
log.info("raw custom config content : {}",
seatunnelParameters.getRawScript());
String script =
seatunnelParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
index 11fffedd80..3268f6ee75 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
@@ -19,12 +19,17 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.commons.io.FileUtils;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -99,6 +104,35 @@ public class SeatunnelTaskTest {
Assertions.assertEquals(expectedCommand, command);
}
+ @Test
+ public void testParameterPass() throws Exception {
+ String taskId = "3456";
+ SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
+ seatunnelParameters.setUseCustom(false);
+ ResourceInfo resourceInfo = new ResourceInfo();
+ resourceInfo.setResourceName(RESOURCE_SCRIPT_PATH);
+ List<Property> localParam = new ArrayList<>();
+ Property property = new Property("key1", Direct.IN, DataType.VARCHAR,
"value1");
+ localParam.add(property);
+ seatunnelParameters.setLocalParams(localParam);
+
seatunnelParameters.setResourceList(Collections.singletonList(resourceInfo));
+
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setExecutePath(EXECUTE_PATH);
+ taskExecutionContext.setTaskAppId(taskId);
+
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
+ ResourceContext resourceContext = new ResourceContext();
+ resourceContext.addResourceItem(new
ResourceContext.ResourceItem(RESOURCE_SCRIPT_PATH, RESOURCE_SCRIPT_PATH));
+ taskExecutionContext.setResourceContext(resourceContext);
+
taskExecutionContext.setPrepareParamsMap(Collections.singletonMap("key1",
property));
+
+ SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
+ seatunnelTask.setSeatunnelParameters(seatunnelParameters);
+ String command = String.join(" ", seatunnelTask.buildOptions());
+ String expectedCommand = String.format("--config %s/seatunnel_%s.conf
-i key1='value1'", EXECUTE_PATH, taskId);
+ Assertions.assertEquals(expectedCommand, command);
+ }
+
private static final String RAW_SCRIPT = "env {\n" +
" execution.parallelism = 2\n" +
" job.mode = \"BATCH\"\n" +