This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f21550ffc4 [Improvement-17861][zeppelin task plugin] support zeppelin
tasks with parameters parsing (#17862)
f21550ffc4 is described below
commit f21550ffc458a9875087c5b4566080715d6f6237
Author: walt <[email protected]>
AuthorDate: Fri Jan 9 10:36:42 2026 +0800
[Improvement-17861][zeppelin task plugin] support zeppelin tasks with
parameters parsing (#17862)
---
docs/docs/en/guide/parameter/context.md | 11 +++++++++
docs/docs/en/guide/task/zeppelin.md | 12 +++++++++-
docs/docs/zh/guide/parameter/context.md | 11 +++++++++
docs/docs/zh/guide/task/zeppelin.md | 12 +++++++++-
.../new_ui/dev/parameter/zeppelin_parameters01.png | Bin 0 -> 144726 bytes
.../plugin/task/zeppelin/ZeppelinTask.java | 25 ++++++++++++++++++---
.../plugin/task/zeppelin/ZeppelinTaskTest.java | 18 +++++++++++++++
7 files changed, 84 insertions(+), 5 deletions(-)
diff --git a/docs/docs/en/guide/parameter/context.md
b/docs/docs/en/guide/parameter/context.md
index 849714c5c5..8fae320c35 100644
--- a/docs/docs/en/guide/parameter/context.md
+++ b/docs/docs/en/guide/parameter/context.md
@@ -16,6 +16,7 @@ DolphinScheduler allows parameter transfer between tasks.
Currently, transfer di
* [Python](../task/python.md)
* [SubWorkflow](../task/sub-workflow.md)
* [Kubernetes](../task/kubernetes.md)
+* [zeppelin](../task/zeppelin.md)
When defining an upstream node, if there is a need to transmit the result of
that node to a dependency related downstream node. You need to set an `OUT`
direction parameter to [Custom Parameters] of the [Current Node Settings]. If
it is a sub-workflow node, there is no need to set a parameter in [Current Node
Settings], but an `OUT` direction parameter needs to be set in the workflow
definition of the sub-workflow.
@@ -140,3 +141,13 @@ For example

Another special consideration, not always can DolphinScheduler collect pod
logs, if the user redirects the log output stream, DolphinScheduler can not
collect logs for use and can not use the output parameter, either.
+
+#### Pass parameter from Zeppelin task to downstream
+
+In the custom parameters, add an `IN` type `input` parameter.
+
+For example
+
+
+
+Note: json key must be enclosed in " "
diff --git a/docs/docs/en/guide/task/zeppelin.md
b/docs/docs/en/guide/task/zeppelin.md
index 589be18b7e..58d3365720 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -27,7 +27,17 @@ it will call `Zeppelin Client API` to trigger zeppelin
notebook paragraph. Click
| Zeppelin password | the login password of your zeppelin
server .
|
| |
| Zeppelin Rest Endpoint | The REST endpoint of your zeppelin
server .
|
-| Zeppelin Parameters | Parameters in json format used for
zeppelin dynamic form.
|
+| Zeppelin Parameters | Parameters in json format used for
zeppelin dynamic form, which will replace the content with `${variable}` in the
script. |
+
+## Task Output Parameters
+
+| **Task Parameter** | **Description** |
+|--------------------|----------------------------------|
+| result | VARCHAR, zeppelin execute result |
+
+Can use `${taskName.result}` to reference task output parameters in downstream
tasks.
+
+For example, if the current task1 is a zeppelin task, the downstream task can
use `${task1.result}` to reference the output parameters of task1.
## Production (Clone) Mode
diff --git a/docs/docs/zh/guide/parameter/context.md
b/docs/docs/zh/guide/parameter/context.md
index 76a9263254..160e833f8e 100644
--- a/docs/docs/zh/guide/parameter/context.md
+++ b/docs/docs/zh/guide/parameter/context.md
@@ -16,6 +16,7 @@ DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支
* [Python](../task/python.md)
* [SubWorkflow](../task/sub-workflow.md)
* [Kubernetes](../task/kubernetes.md)
+* [zeppelin](../task/zeppelin.md)
当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。如果是
SubWorkflow 节点无需在【当前节点设置】中设置变量,需要在子流程的工作流定义中设置一个方向是 OUT 的变量。
@@ -137,3 +138,13 @@ Node_mysql 运行结果如下:

另外需要特别注意的是,并非总是可以收集pod日志,如果用户重定向日志输出流,我们既不能收集日志使用,也不能使用输出参数。
+
+#### Zeppelin 任务传递参数
+
+在自定义参数中,添加一个 IN 类型的 input 参数。
+
+如下图所示:
+
+
+
+注意:JSON 键名必须用双引号括起来。
diff --git a/docs/docs/zh/guide/task/zeppelin.md
b/docs/docs/zh/guide/task/zeppelin.md
index 579f1a7427..5ade0fa8a6 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -25,7 +25,17 @@
| Zeppelin username | 您的Zeppelin服务的登陆用户名
|
| Zeppelin password | 您的Zeppelin服务的登陆密码
|
| Zeppelin Production Note Directory | 生产模式下存放克隆note的目录
|
-| Zeppelin Parameters | 用于传入Zeppelin Dynamic Form的参数
|
+| Zeppelin Parameters | 用于传入Zeppelin Dynamic Form的参数,会替换脚本中以
${变量} 的内容 |
+
+## 任务输出参数
+
+| **任务参数** | **描述** |
+|----------|-----------------------|
+| result | VARCHAR, zeppelin执行结果 |
+
+可以在下游任务中使用 ${taskName.result} 引用任务输出参数。
+
+如,当前task1为zeppelin任务, 下游任务可以使用 `${task1.result}` 引用task1的输出参数
## 生产(克隆)模式
diff --git a/docs/img/new_ui/dev/parameter/zeppelin_parameters01.png
b/docs/img/new_ui/dev/parameter/zeppelin_parameters01.png
new file mode 100644
index 0000000000..24f76ae1f0
Binary files /dev/null and
b/docs/img/new_ui/dev/parameter/zeppelin_parameters01.png differ
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index c3b580de35..8f0dd2896d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -26,7 +26,11 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils;
@@ -107,10 +111,14 @@ public class ZeppelinTask extends AbstractRemoteTask {
// noteId may be replaced with cloned noteId
String noteId = this.zeppelinParameters.getNoteId();
Map<String, String> zeppelinParamsMap = new HashMap<>();
- if (parameters != null) {
+ if (StringUtils.isNotBlank(parameters)) {
+ Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
+ String replacedParams =
+
ParameterUtils.convertParameterPlaceholders(parameters,
ParameterUtils.convert(paramsMap));
ObjectMapper mapper = new ObjectMapper();
- zeppelinParamsMap = mapper.readValue(parameters, Map.class);
+ zeppelinParamsMap = mapper.readValue(replacedParams,
Map.class);
}
+ log.info("request zeppelin parameters:{}",
JSONUtils.toPrettyJsonString(zeppelinParamsMap));
// Submit zeppelin task
String resultContent;
@@ -149,7 +157,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
if (productionNoteDirectory != null) {
this.zClient.deleteNote(noteId);
}
-
+ // add response to out var poll
+ addDefaultOutput(resultContent);
// Use noteId-paragraph-Id as app id
final int exitStatusCode = mapStatusToExitCode(status);
setAppIds(String.format("%s-%s", noteId, paragraphId));
@@ -245,4 +254,14 @@ public class ZeppelinTask extends AbstractRemoteTask {
return Collections.emptyList();
}
+ public void addDefaultOutput(String response) {
+ // put response in output
+ Property outputProperty = new Property();
+ outputProperty.setProp(String.format("%s.%s",
taskExecutionContext.getTaskName(), "result"));
+ outputProperty.setDirect(Direct.OUT);
+ outputProperty.setType(DataType.VARCHAR);
+ outputProperty.setValue(response);
+ zeppelinParameters.addPropertyToValPool(outputProperty);
+ }
+
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index ffaf85b207..1685b49585 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -33,7 +33,10 @@ import
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConn
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.zeppelin.client.NoteResult;
@@ -41,6 +44,7 @@ import org.apache.zeppelin.client.ParagraphResult;
import org.apache.zeppelin.client.Status;
import org.apache.zeppelin.client.ZeppelinClient;
+import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
@@ -227,6 +231,20 @@ public class ZeppelinTaskTest {
}
}
+ @Test
+ public void testAddDefaultOutput() throws Exception {
+ String response = "zeppelin-result";
+ this.zeppelinTask.addDefaultOutput(response);
+
+ List<Property> varPool =
this.zeppelinTask.getParameters().getVarPool();
+ Assertions.assertEquals(1, varPool.size());
+ Property property = varPool.get(0);
+ Assertions.assertEquals("null.result", property.getProp());
+ Assertions.assertEquals(Direct.OUT, property.getDirect());
+ Assertions.assertEquals(DataType.VARCHAR, property.getType());
+ Assertions.assertEquals(response, property.getValue());
+ }
+
private String buildZeppelinTaskParameters() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);