This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 71ee1f0c3d Support parse task output params under multiple log (#15244)
71ee1f0c3d is described below
commit 71ee1f0c3d2774e4100dc78829f5139bf6e0e71c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Dec 1 10:16:36 2023 +0800
Support parse task output params under multiple log (#15244)
---
docs/docs/en/guide/parameter/context.md | 2 +-
docs/docs/zh/guide/parameter/context.md | 2 +-
.../plugin/task/api/AbstractCommandExecutor.java | 80 +++++----------
.../plugin/task/api/AbstractTask.java | 26 ++---
.../plugin/task/api/AbstractYarnTask.java | 7 +-
.../plugin/task/api/ShellCommandExecutor.java | 10 +-
.../plugin/task/api/TaskConstants.java | 4 -
.../plugin/task/api/k8s/AbstractK8sTask.java | 11 ++-
.../task/api/k8s/AbstractK8sTaskExecutor.java | 16 +--
.../plugin/task/api/k8s/impl/K8sTaskExecutor.java | 18 ++--
.../plugin/task/api/loop/BaseLoopTaskExecutor.java | 2 +
.../task/api/parameters/AbstractParameters.java | 38 ++------
.../plugin/task/api/parameters/SqlParameters.java | 1 -
.../task/api/parser/TaskOutputParameterParser.java | 107 +++++++++++++++++++++
.../plugin/task/api/utils/VarPoolUtils.java | 47 ---------
.../plugin/task/api/AbstractTaskTest.java | 20 ----
.../plugin/task/api/k8s/K8sTaskExecutorTest.java | 9 +-
.../api/parser/TaskOutputParameterParserTest.java | 77 +++++++++++++++
.../plugin/task/api/utils/VarPoolUtilsTest.java | 50 ----------
.../test/resources/outputParam/emptyVarPoolLog.txt | 16 +++
.../test/resources/outputParam/multipleVarPoll.txt | 19 ++++
.../outputParam/oneVarPollInMultiLineLog.txt | 21 ++++
.../resources/outputParam/onelineVarPoolLog.txt | 18 ++++
.../plugin/task/chunjun/ChunJunTask.java | 9 +-
.../plugin/task/datafactory/DatafactoryTask.java | 2 +
.../plugin/task/dq/DataQualityTask.java | 3 +
.../plugin/task/datasync/DatasyncTask.java | 2 +
.../plugin/task/datax/DataxTask.java | 6 +-
.../plugin/task/dinky/DinkyTask.java | 14 +--
.../dolphinscheduler/plugin/task/dms/DmsTask.java | 3 +
.../dolphinscheduler/plugin/task/dvc/DvcTask.java | 10 +-
.../plugin/task/emr/AbstractEmrTask.java | 3 +
.../plugin/task/emr/EmrAddStepsTask.java | 3 +
.../plugin/task/emr/EmrJobFlowTask.java | 3 +
.../plugin/task/flink/FlinkStreamTask.java | 11 +--
.../plugin/task/flink/FlinkTask.java | 3 +
.../plugin/task/hivecli/HiveCliTask.java | 9 +-
.../plugin/task/http/HttpTask.java | 3 +
.../plugin/task/java/JavaTask.java | 9 +-
.../plugin/task/jupyter/JupyterTask.java | 7 +-
.../dolphinscheduler/plugin/task/k8s/K8sTask.java | 15 +--
.../plugin/task/k8s/K8sTaskTest.java | 4 +-
.../plugin/kubeflow/KubeflowTask.java | 3 +
.../plugin/task/linkis/LinkisTask.java | 12 +--
.../plugin/task/mlflow/MlflowTask.java | 10 +-
.../plugin/task/mr/MapReduceTask.java | 18 +---
.../plugin/task/openmldb/OpenmldbTask.java | 6 +-
.../plugin/task/pigeon/PigeonTask.java | 3 +
.../plugin/task/procedure/ProcedureTask.java | 18 ++--
.../plugin/task/python/PythonTask.java | 20 ++--
.../plugin/task/pytorch/PytorchTask.java | 9 +-
.../plugin/task/remoteshell/RemoteExecutor.java | 73 +++++---------
.../plugin/task/remoteshell/RemoteShellTask.java | 8 +-
.../plugin/task/sagemaker/SagemakerTask.java | 3 +
.../plugin/task/seatunnel/SeatunnelTask.java | 12 +--
.../plugin/task/shell/ShellTask.java | 12 +--
.../plugin/task/spark/SparkTask.java | 10 +-
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 16 +--
.../plugin/task/sqoop/SqoopTask.java | 9 +-
.../plugin/task/zeppelin/ZeppelinTask.java | 13 +--
60 files changed, 495 insertions(+), 480 deletions(-)
diff --git a/docs/docs/en/guide/parameter/context.md
b/docs/docs/en/guide/parameter/context.md
index 15cf0f4aed..9d6131d92e 100644
--- a/docs/docs/en/guide/parameter/context.md
+++ b/docs/docs/en/guide/parameter/context.md
@@ -122,7 +122,7 @@ Although the two parameters var1 and var2 are output in the
A task, only the `OU
#### Pass parameter from Kubernetes task to downstream
-Different programming languages may use different logging frameworks in
Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler
provides a universal logging data format `${(key=value)dsVal}`. Users can
output log data in the format `${(key=value)dsVal}` in the terminal logs of
their applications, where `key` is the corresponding parameter prop and `value`
is the value of that parameter. DolphinScheduler will capture the
`${(key=value)dsVal}` in the output logs to captu [...]
+Different programming languages may use different logging frameworks in
Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler
provides a universal logging data format `${(key=value)}` or `#{(key=value)}`.
Users can output log data in the format in the terminal logs of their
applications, where `key` is the corresponding parameter prop and `value` is
the value of that parameter. DolphinScheduler will capture the `${(key=value)}`
or `#{(key=value)}` in the output logs [...]
For example
diff --git a/docs/docs/zh/guide/parameter/context.md
b/docs/docs/zh/guide/parameter/context.md
index f02f774142..7f5870458d 100644
--- a/docs/docs/zh/guide/parameter/context.md
+++ b/docs/docs/zh/guide/parameter/context.md
@@ -121,7 +121,7 @@ Node_mysql 运行结果如下:
#### Kubernetes 任务传递参数
-在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即
`${(key=value)dsVal}`,用户可以在应用程序的终端日志中输出以格式为 `${(key=value)dsVal}` 结束的日志数据,key
为对应参数的 prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的
`${(key=value)dsVal}`来进行参数捕捉,从而传递到下游。
+在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即
`${(key=value)}` 或 `#{(key=value)}`,用户可以在应用程序的终端日志中输出以这种格式的日志数据,key 为对应参数的
prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)}` 和 `#{(key=value)}`
来进行参数捕捉,从而传递到下游。
如下图所示:
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index eef4c98e69..e54df42944 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor;
import
org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -39,31 +40,25 @@ import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
+import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.client.dsl.LogWatch;
/**
* abstract command executor
*/
+@Slf4j
public abstract class AbstractCommandExecutor {
- /**
- * rules for extracting Var Pool
- */
- protected static final Pattern SETVALUE_REGEX =
Pattern.compile(TaskConstants.SETVALUE_REGEX);
-
- protected StringBuilder varPool = new StringBuilder();
+ protected volatile Map<String, String> taskOutputParams = new HashMap<>();
/**
* process
*/
@@ -74,11 +69,6 @@ public abstract class AbstractCommandExecutor {
*/
protected Consumer<LinkedBlockingQueue<String>> logHandler;
- /**
- * logger
- */
- protected Logger logger;
-
/**
* log list
*/
@@ -98,11 +88,9 @@ public abstract class AbstractCommandExecutor {
protected Future<?> podLogOutputFuture;
public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>>
logHandler,
- TaskExecutionContext taskRequest,
- Logger logger) {
+ TaskExecutionContext taskRequest) {
this.logHandler = logHandler;
this.taskRequest = taskRequest;
- this.logger = logger;
this.logBuffer = new LinkedBlockingQueue<>();
this.logBuffer.add(EMPTY_STRING);
@@ -119,7 +107,7 @@ public abstract class AbstractCommandExecutor {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
- logger.warn(
+ log.warn(
"Cannot find the taskInstance: {} from
TaskExecutionContextCacheManager, the task might already been killed",
taskInstanceId);
result.setExitStatusCode(EXIT_CODE_KILL);
@@ -180,7 +168,7 @@ public abstract class AbstractCommandExecutor {
return result;
}
// print process id
- logger.info("process start, process id is: {}", processId);
+ log.info("process start, process id is: {}", processId);
// if timeout occurs, exit directly
long remainTime = getRemainTime();
@@ -201,7 +189,7 @@ public abstract class AbstractCommandExecutor {
// Wait the task log process finished.
taskOutputFuture.get();
} catch (ExecutionException e) {
- logger.error("Handle task log error", e);
+ log.error("Handle task log error", e);
}
}
@@ -212,7 +200,7 @@ public abstract class AbstractCommandExecutor {
// delete pod after successful execution and log collection
ProcessUtils.cancelApplication(taskRequest);
} catch (ExecutionException e) {
- logger.error("Handle pod log error", e);
+ log.error("Handle pod log error", e);
}
}
@@ -223,21 +211,21 @@ public abstract class AbstractCommandExecutor {
result.setExitStatusCode(this.process.exitValue());
} else {
- logger.error("process has failure, the task timeout configuration
value is:{}, ready to kill ...",
+ log.error("process has failure, the task timeout configuration
value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has
killed." : "process has exited.";
- logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{}
,processWaitForStatus:{} ,processExitValue:{}",
+ log.info("{} execute path:{}, processId:{} ,exitStatusCode:{}
,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId,
result.getExitStatusCode(), status, exitCode);
return result;
}
- public String getVarPool() {
- return varPool.toString();
+ public Map<String, String> getTaskOutputParams() {
+ return taskOutputParams;
}
public void cancelApplication() throws InterruptedException {
@@ -246,16 +234,12 @@ public abstract class AbstractCommandExecutor {
}
// soft kill
- logger.info("Begin to kill process process, pid is : {}",
taskRequest.getProcessId());
+ log.info("Begin to kill process process, pid is : {}",
taskRequest.getProcessId());
process.destroy();
if (!process.waitFor(5, TimeUnit.SECONDS)) {
process.destroyForcibly();
}
- logger.info("Success kill task: {}, pid: {}",
taskRequest.getTaskAppId(), taskRequest.getProcessId());
- }
-
- private void printCommand(List<String> commands) {
- logger.info("task run command: {}", String.join(" ", commands));
+ log.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(),
taskRequest.getProcessId());
}
private void collectPodLogIfNeeded() {
@@ -299,24 +283,22 @@ public abstract class AbstractCommandExecutor {
ExecutorService getOutputLogService = ThreadUtils
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" +
taskRequest.getTaskName());
getOutputLogService.submit(() -> {
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
try (BufferedReader inReader = new BufferedReader(new
InputStreamReader(process.getInputStream()))) {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
String line;
while ((line = inReader.readLine()) != null) {
- if (line.startsWith("${setValue(") ||
line.startsWith("#{setValue(")) {
- varPool.append(findVarPool(line));
- varPool.append("$VarPool$");
- } else {
- logBuffer.add(line);
- }
+ logBuffer.add(line);
+ taskOutputParameterParser.appendParseLog(line);
}
processLogOutputIsSuccess = true;
} catch (Exception e) {
- logger.error("Parse var pool error", e);
+ log.error("Parse var pool error", e);
processLogOutputIsSuccess = true;
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
+ taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
});
getOutputLogService.shutdown();
@@ -336,7 +318,7 @@ public abstract class AbstractCommandExecutor {
}
}
} catch (Exception e) {
- logger.error("Output task log error", e);
+ log.error("Output task log error", e);
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
@@ -344,20 +326,6 @@ public abstract class AbstractCommandExecutor {
parseProcessOutputExecutorService.shutdown();
}
- /**
- * find var pool
- *
- * @param line
- * @return
- */
- private String findVarPool(String line) {
- Matcher matcher = SETVALUE_REGEX.matcher(line);
- if (matcher.find()) {
- return matcher.group(1);
- }
- return null;
- }
-
/**
* get remain time(s)
*
@@ -389,7 +357,7 @@ public abstract class AbstractCommandExecutor {
processId = f.getInt(process);
} catch (Exception e) {
- logger.error("Get task pid failed", e);
+ log.error("Get task pid failed", e);
}
return processId;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
index 95ae22f272..4437df763b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
@@ -28,24 +28,20 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
-/**
- * executive task
- */
+@Slf4j
public abstract class AbstractTask {
- protected final Logger log = LoggerFactory.getLogger(AbstractTask.class);
-
private static String groupName1 = "paramName1";
private static String groupName2 = "paramName2";
public String rgex =
String.format("['\"]\\$\\{(?<%s>.*?)}['\"]|\\$\\{(?<%s>.*?)}", groupName1,
groupName2);
- /**
- * varPool string
- */
- protected String varPool;
+ @Getter
+ @Setter
+ protected Map<String, String> taskOutputParams;
/**
* taskExecutionContext
@@ -91,14 +87,6 @@ public abstract class AbstractTask {
public abstract void cancel() throws TaskException;
- public void setVarPool(String varPool) {
- this.varPool = varPool;
- }
-
- public String getVarPool() {
- return varPool;
- }
-
/**
* get exit status code
*
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 69f6aceb99..716947faa2 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -29,15 +29,16 @@ import
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import java.util.List;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public abstract class AbstractYarnTask extends AbstractRemoteTask {
private ShellCommandExecutor shellCommandExecutor;
public AbstractYarnTask(TaskExecutionContext taskRequest) {
super(taskRequest);
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskRequest,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest);
}
// todo split handle to submit and track
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index 2dbea62287..c9834adb86 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -20,17 +20,11 @@ package org.apache.dolphinscheduler.plugin.task.api;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
-import org.slf4j.Logger;
-
-/**
- * shell command executor
- */
public class ShellCommandExecutor extends AbstractCommandExecutor {
public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>>
logHandler,
- TaskExecutionContext taskRequest,
- Logger logger) {
- super(logHandler, taskRequest, logger);
+ TaskExecutionContext taskRequest) {
+ super(logHandler, taskRequest);
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 97f2943950..83f4b3a678 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -35,10 +35,6 @@ public class TaskConstants {
public static final String FLINK_APPLICATION_REGEX = "JobID \\w+";
- public static final String SETVALUE_REGEX =
"[\\$#]\\{setValue\\((.*?)\\)}";
-
- public static final String DSVALUE_REGEX = "[\\$#]\\{\\((.*?)\\)dsVal}$";
-
/**
* string false
*/
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
index 1d9784ace4..f77170f482 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -24,6 +24,11 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public abstract class AbstractK8sTask extends AbstractRemoteTask {
/**
@@ -37,7 +42,7 @@ public abstract class AbstractK8sTask extends
AbstractRemoteTask {
*/
protected AbstractK8sTask(TaskExecutionContext taskRequest) {
super(taskRequest);
- this.abstractK8sTaskExecutor = new K8sTaskExecutor(log, taskRequest);
+ this.abstractK8sTaskExecutor = new K8sTaskExecutor(taskRequest);
}
// todo split handle to submit and track
@@ -47,7 +52,7 @@ public abstract class AbstractK8sTask extends
AbstractRemoteTask {
TaskResponse response =
abstractK8sTaskExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
setAppIds(response.getAppIds());
- dealOutParam(abstractK8sTaskExecutor.getVarPool());
+ dealOutParam(abstractK8sTaskExecutor.getTaskOutputParams());
} catch (Exception e) {
log.error("k8s task submit failed with error");
exitStatusCode = -1;
@@ -86,5 +91,5 @@ public abstract class AbstractK8sTask extends
AbstractRemoteTask {
*/
protected abstract String buildCommand();
- protected abstract void dealOutParam(String result);
+ protected abstract void dealOutParam(Map<String, String> taskOutputParams);
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
index 6ad8869d33..1313dc23a6 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
@@ -22,25 +22,25 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
-import org.slf4j.Logger;
+import java.util.HashMap;
+import java.util.Map;
+
import org.yaml.snakeyaml.Yaml;
public abstract class AbstractK8sTaskExecutor {
- protected Logger log;
protected TaskExecutionContext taskRequest;
protected K8sUtils k8sUtils;
protected Yaml yaml;
- protected StringBuilder varPool;
- protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext
taskRequest) {
- this.log = log;
+ protected volatile Map<String, String> taskOutputParams;
+ protected AbstractK8sTaskExecutor(TaskExecutionContext taskRequest) {
this.taskRequest = taskRequest;
this.k8sUtils = new K8sUtils();
this.yaml = new Yaml();
- this.varPool = new StringBuilder();
+ this.taskOutputParams = new HashMap<>();
}
- public String getVarPool() {
- return varPool.toString();
+ public Map<String, String> getTaskOutputParams() {
+ return taskOutputParams;
}
public abstract TaskResponse run(String k8sParameterStr) throws Exception;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index 66665512b6..476ed85be9 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -44,10 +44,10 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.commons.lang3.StringUtils;
@@ -64,8 +64,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-
+import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
@@ -84,14 +83,15 @@ import io.fabric8.kubernetes.client.dsl.LogWatch;
/**
* K8sTaskExecutor used to submit k8s task to K8S
*/
+@Slf4j
public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
private Job job;
protected boolean podLogOutputIsFinished = false;
protected Future<?> podLogOutputFuture;
- public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
- super(logger, taskRequest);
+ public K8sTaskExecutor(TaskExecutionContext taskRequest) {
+ super(taskRequest);
}
public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
@@ -255,6 +255,7 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
String containerName = String.format("%s-%s", taskName,
taskInstanceId);
podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
try (
LogWatch watcher =
ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
taskRequest.getTaskAppId(), containerName)) {
@@ -263,11 +264,7 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
try (BufferedReader reader = new BufferedReader(new
InputStreamReader(watcher.getOutput()))) {
while ((line = reader.readLine()) != null) {
log.info("[K8S-pod-log] {}", line);
-
- if (line.endsWith(VarPoolUtils.VAR_SUFFIX)) {
- varPool.append(VarPoolUtils.findVarPool(line));
- varPool.append(VarPoolUtils.VAR_DELIMITER);
- }
+ taskOutputParameterParser.appendParseLog(line);
}
}
} catch (Exception e) {
@@ -276,6 +273,7 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
LogUtils.removeTaskInstanceLogFullPathMDC();
podLogOutputIsFinished = true;
}
+ taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
});
collectPodLogExecutorService.shutdown();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
index 7b75def685..4d52a38f73 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
@@ -30,12 +30,14 @@ import java.time.Duration;
import javax.annotation.Nullable;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
/**
* This class is the base class for all loop task type.
* <p>
* The loop task type means, we will submit a task, and loop the task status
until the task is finished.
*/
+@Slf4j
public abstract class BaseLoopTaskExecutor extends AbstractRemoteTask {
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
index 78812fcb24..a57eececf5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
@@ -24,21 +24,21 @@ import
org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
-/**
- * job params related class
- */
+@Slf4j
public abstract class AbstractParameters implements IParameters {
@Override
@@ -130,7 +130,7 @@ public abstract class AbstractParameters implements
IParameters {
}
}
- public void dealOutParam(String result) {
+ public void dealOutParam(Map<String, String> taskOutputParams) {
if (CollectionUtils.isEmpty(localParams)) {
return;
}
@@ -138,19 +138,18 @@ public abstract class AbstractParameters implements
IParameters {
if (CollectionUtils.isEmpty(outProperty)) {
return;
}
- if (StringUtils.isEmpty(result)) {
+ if (MapUtils.isEmpty(taskOutputParams)) {
outProperty.forEach(this::addPropertyToValPool);
return;
}
- Map<String, String> taskResult = getMapByString(result);
- if (taskResult.size() == 0) {
- return;
- }
+
for (Property info : outProperty) {
- String propValue = taskResult.get(info.getProp());
+ String propValue = taskOutputParams.get(info.getProp());
if (StringUtils.isNotEmpty(propValue)) {
info.setValue(propValue);
addPropertyToValPool(info);
+ } else {
+ log.warn("Cannot find the output parameter {} in the task
output parameters", info.getProp());
}
}
}
@@ -178,23 +177,6 @@ public abstract class AbstractParameters implements
IParameters {
return allParams;
}
- /**
- * shell's result format is key=value$VarPool$key=value$VarPool$
- * @param result
- * @return
- */
- public static Map<String, String> getMapByString(String result) {
- String[] formatResult = result.split("\\$VarPool\\$");
- Map<String, String> format = new HashMap<>();
- for (String info : formatResult) {
- if (StringUtils.isNotEmpty(info) && info.contains("=")) {
- String[] keyValue = info.split("=");
- format.put(keyValue[0], keyValue[1]);
- }
- }
- return format;
- }
-
public ResourceParametersHelper getResources() {
return new ResourceParametersHelper();
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
index bff2fe20d7..0f1a893a30 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
@@ -236,7 +236,6 @@ public class SqlParameters extends AbstractParameters {
return new ArrayList<>();
}
- @Override
public void dealOutParam(String result) {
if (CollectionUtils.isEmpty(localParams)) {
return;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
new file mode 100644
index 0000000000..e79d44e4da
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.task.api.parser;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Used to parse ${setValue()} and #{setValue()} from given lines.
+ */
+@Slf4j
+@NotThreadSafe
+public class TaskOutputParameterParser {
+
+ private final Map<String, String> taskOutputParams = new HashMap<>();
+
+ private List<String> currentTaskOutputParam;
+
+ public void appendParseLog(String log) {
+ if (log == null) {
+ return;
+ }
+
+ if (currentTaskOutputParam != null) {
+ // continue to parse the rest of line
+ int i = log.indexOf(")}");
+ if (i == -1) {
+ // the end of var pool not found
+ currentTaskOutputParam.add(log);
+ } else {
+ // the end of var pool found
+ currentTaskOutputParam.add(log.substring(0, i + 2));
+ Pair<String, String> keyValue =
parseOutputParam(String.join("\n", currentTaskOutputParam));
+ if (keyValue.getKey() != null && keyValue.getValue() != null) {
+ taskOutputParams.put(keyValue.getKey(),
keyValue.getValue());
+ }
+ currentTaskOutputParam = null;
+ // continue to parse the rest of line
+ if (i + 2 != log.length()) {
+ appendParseLog(log.substring(i + 2));
+ }
+ }
+ return;
+ }
+
+ int indexOfVarPoolBegin = log.indexOf("${setValue(");
+ if (indexOfVarPoolBegin == -1) {
+ indexOfVarPoolBegin = log.indexOf("#{setValue(");
+ }
+ if (indexOfVarPoolBegin == -1) {
+ return;
+ }
+ currentTaskOutputParam = new ArrayList<>();
+ appendParseLog(log.substring(indexOfVarPoolBegin));
+ }
+
+ public Map<String, String> getTaskOutputParams() {
+ return taskOutputParams;
+ }
+
+ // #{setValue(xx=xx)}
+ protected Pair<String, String> parseOutputParam(String outputParam) {
+ if (StringUtils.isEmpty(outputParam)) {
+ log.info("The task output param is empty");
+ return ImmutablePair.nullPair();
+ }
+ if ((!outputParam.startsWith("${setValue(") &&
!outputParam.startsWith("#{setValue("))
+ || !outputParam.endsWith(")}")) {
+ log.info("The task output param {} should start with '${setValue('
or '#{setValue(' and end with ')}'",
+ outputParam);
+ return ImmutablePair.nullPair();
+ }
+ String keyValueExpression = outputParam.substring(11,
outputParam.length() - 2);
+ if (!keyValueExpression.contains("=")) {
+ log.warn("The task output param {} should composite with
key=value", outputParam);
+ return ImmutablePair.nullPair();
+ }
+
+ String[] keyValue = keyValueExpression.split("=", 2);
+ return ImmutablePair.of(keyValue[0], keyValue[1]);
+ }
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
deleted file mode 100644
index bb777e7126..0000000000
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api.utils;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import lombok.experimental.UtilityClass;
-
-@UtilityClass
-public class VarPoolUtils {
-
- static final Pattern DSVALUE_REGEX =
Pattern.compile(TaskConstants.DSVALUE_REGEX);
- public static final String VAR_SUFFIX = ")dsVal}";
-
- public static final String VAR_DELIMITER = "$VarPool$";
- /**
- * find var pool
- *
- * @param line
- * @return
- */
- public static String findVarPool(String line) {
- Matcher matcher = DSVALUE_REGEX.matcher(line);
- if (matcher.find()) {
- return matcher.group(1);
- }
- return null;
- }
-}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
index 9cf407e00f..a7439aef67 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
@@ -38,24 +38,4 @@ public class AbstractTaskTest {
Assertions.assertEquals(jobId, str.substring(6));
}
- @Test
- public void testSetValue() {
- Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX);
- String line1 = "${setValue(sql=\"INSERT INTO a VALUES (1, 2);\")}";
- String line2 = "${setValue(a=2))}";
- Matcher matcher1 = SETVALUE_REGEX.matcher(line1);
- String str1 = null;
- if (matcher1.find()) {
- str1 = matcher1.group();
- }
- String str2 = null;
- Matcher matcher2 = SETVALUE_REGEX.matcher(line2);
- if (matcher2.find()) {
- str2 = matcher2.group();
- }
- Assertions.assertNotNull(str1);
- Assertions.assertNotNull(str2);
- Assertions.assertEquals(str1.length(), line1.length());
- Assertions.assertEquals(str2.length(), line2.length());
- }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
index 2a793dc80a..4bbd29d18e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.plugin.task.api.k8s;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-import static
org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -65,7 +64,7 @@ public class K8sTaskExecutorTest {
requirement.setKey("node-label");
requirement.setOperator("In");
requirement.setValues(Arrays.asList("1234", "123456"));
- k8sTaskExecutor = new K8sTaskExecutor(logger, taskRequest);
+ k8sTaskExecutor = new K8sTaskExecutor(taskRequest);
k8sTaskMainParameters = new K8sTaskMainParameters();
k8sTaskMainParameters.setImage(image);
k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy);
@@ -102,10 +101,4 @@ public class K8sTaskExecutorTest {
}
}
- @Test
- public void testValpool() {
- String result = "key=value" + VAR_DELIMITER;
- k8sTaskExecutor.varPool.append(result);
- Assertions.assertEquals(result, k8sTaskExecutor.getVarPool());
- }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
new file mode 100644
index 0000000000..950ed822c0
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.parser;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+class TaskOutputParameterParserTest {
+
+ @Test
+ void testEmptyLog() throws IOException, URISyntaxException {
+ List<String> varPools = getLogs("/outputParam/emptyVarPoolLog.txt");
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
+ varPools.forEach(taskOutputParameterParser::appendParseLog);
+
Assertions.assertTrue(taskOutputParameterParser.getTaskOutputParams().isEmpty());
+ }
+
+ @Test
+ void testOneLineLog() throws IOException, URISyntaxException {
+ List<String> varPools = getLogs("/outputParam/onelineVarPoolLog.txt");
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
+ varPools.forEach(taskOutputParameterParser::appendParseLog);
+ assertEquals(ImmutableMap.of("name", "name=tom"),
taskOutputParameterParser.getTaskOutputParams());
+ }
+
+ @Test
+ void testOneVarPollInMultiLineLog() throws IOException, URISyntaxException
{
+ List<String> varPools =
getLogs("/outputParam/oneVarPollInMultiLineLog.txt");
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
+ varPools.forEach(taskOutputParameterParser::appendParseLog);
+ assertEquals(ImmutableMap.of("sql",
+ "select * from table\n" +
+ "where\n" +
+ "id = 1\n"),
+ taskOutputParameterParser.getTaskOutputParams());
+ }
+
+ @Test
+ void testVarPollInMultiLineLog() throws IOException, URISyntaxException {
+ List<String> varPools = getLogs("/outputParam/multipleVarPoll.txt");
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
+ varPools.forEach(taskOutputParameterParser::appendParseLog);
+ assertEquals(ImmutableMap.of("name", "tom", "age", "1"),
taskOutputParameterParser.getTaskOutputParams());
+ }
+
+ private List<String> getLogs(String file) throws IOException,
URISyntaxException {
+ URI uri =
TaskOutputParameterParserTest.class.getResource(file).toURI();
+ return Files.lines(Paths.get(uri)).collect(Collectors.toList());
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
deleted file mode 100644
index e083554bde..0000000000
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api.utils;
-
-import java.util.HashMap;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class VarPoolUtilsTest {
-
- @Test
- void findVar() {
- HashMap<String, String> tcs = new HashMap<>();
- tcs.put("${(set_val=123)dsVal}", "set_val=123");
- tcs.put("1970-01-01 ${(set_val=123)dsVal}", "set_val=123");
- tcs.put("1970-01-01 ${(set_val=123)dsVal}123", null);
- tcs.put("${(set_val=123}dsVal", null);
- tcs.put("#{(set_val=123)dsVal}", "set_val=123");
- tcs.put("1970-01-01 #{(set_val=123)dsVal}", "set_val=123");
- tcs.put("1970-01-01 #{(set_val=123)dsVal}123", null);
- tcs.put("#{(set_val=123)dsVal}123", null);
- tcs.put("#{(set_val=123dsVal}", null);
-
- tcs.put("${(set_val=123)dsVal}${(set_val=456)dsVal}",
"set_val=123)dsVal}${(set_val=456");
- tcs.put("1970-01-01$#{(set_val=123)dsVal}", "set_val=123");
- tcs.put("1970-01-01{(set_val=123)dsVal}123", null);
- tcs.put("1970-01-01$#{(${(set_val=123)})dsVal}", "${(set_val=123)}");
- tcs.put("1970-01-01$#{(${(set_val=123\\)})dsVal}",
"${(set_val=123\\)}");
-
- for (String tc : tcs.keySet()) {
- Assertions.assertEquals(tcs.get(tc), VarPoolUtils.findVarPool(tc));
- }
- }
-}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt
new file mode 100644
index 0000000000..a9fd83fea0
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
new file mode 100644
index 0000000000..994cee503e
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+INFO: ${setValue(name=tom)}
+INFO: ${setValue(age=1)}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt
new file mode 100644
index 0000000000..b26467cb27
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+${setValue(sql=select * from table
+where
+id = 1
+)}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt
new file mode 100644
index 0000000000..6d80b15695
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+${setValue(name=name=tom)}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index 2e4731d704..e3814a076d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -45,9 +45,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-/**
- * chunjun task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class ChunJunTask extends AbstractTask {
/**
@@ -79,8 +79,7 @@ public class ChunJunTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskExecutionContext, log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
index e36f0659d3..394a58f417 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
@@ -30,9 +30,11 @@ import java.util.List;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
@Setter
@Getter
+@Slf4j
public class DatafactoryTask extends AbstractRemoteTask {
private final TaskExecutionContext taskExecutionContext;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
index 381b8cbd37..1bf1a6454c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
@@ -58,11 +58,14 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
/**
* In DataQualityTask, the input parameters will be converted into
DataQualityConfiguration,
* which will be converted into a string as the parameter of
DataQualityApplication,
* and DataQualityApplication is spark application
*/
+@Slf4j
public class DataQualityTask extends AbstractYarnTask {
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
index c9e3afd29b..2353dc5c4a 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
@@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.List;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,6 +44,7 @@ import
com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.json.JsonMapper;
@Setter
+@Slf4j
public class DatasyncTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper =
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 9b55f00fc9..6490b8b0b0 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -56,6 +56,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
@@ -68,6 +70,7 @@ import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+@Slf4j
public class DataxTask extends AbstractTask {
/**
@@ -125,8 +128,7 @@ public class DataxTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskExecutionContext, log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
index fdaf70add4..694317122e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
@@ -45,28 +45,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
+@Slf4j
public class DinkyTask extends AbstractRemoteTask {
- /**
- * taskExecutionContext
- */
private final TaskExecutionContext taskExecutionContext;
- /**
- * dinky parameters
- */
private DinkyParameters dinkyParameters;
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- */
protected DinkyTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
index 74cf5b9b29..29424a5b76 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
@@ -35,12 +35,15 @@ import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
import
com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.json.JsonMapper;
+@Slf4j
public class DmsTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper =
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 032d953858..d1a8bf8b1b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -33,9 +33,9 @@ import
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilder
import java.util.ArrayList;
import java.util.List;
-/**
- * shell task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class DvcTask extends AbstractTask {
/**
@@ -62,7 +62,7 @@ public class DvcTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext, log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
@Override
@@ -85,7 +85,7 @@ public class DvcTask extends AbstractTask {
TaskResponse commandExecuteResult =
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
- parameters.dealOutParam(shellCommandExecutor.getVarPool());
+
parameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current DvcTask has been interrupted", e);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
index b4603c4605..6f6ec63a29 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
@@ -31,6 +31,8 @@ import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import java.util.TimeZone;
+import lombok.extern.slf4j.Slf4j;
+
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@@ -44,6 +46,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
*
* @since v3.1.0
*/
+@Slf4j
public abstract class AbstractEmrTask extends AbstractRemoteTask {
final TaskExecutionContext taskExecutionContext;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index ff3331d530..753b206e21 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@@ -45,6 +47,7 @@ import com.google.common.collect.Sets;
*
* @since v3.1.0
*/
+@Slf4j
public class EmrAddStepsTask extends AbstractEmrTask {
private String stepId;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index 9abff26818..f4b0534065 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.ClusterState;
import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason;
@@ -40,6 +42,7 @@ import
com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
+@Slf4j
public class EmrJobFlowTask extends AbstractEmrTask {
private final HashSet<String> waitingStateSet = Sets.newHashSet(
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index 3a71df62c7..9ec10889bb 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -31,17 +31,14 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class FlinkStreamTask extends FlinkTask implements StreamTask {
- /**
- * flink parameters
- */
private FlinkStreamParameters flinkParameters;
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ private final TaskExecutionContext taskExecutionContext;
public FlinkStreamTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 75764c3677..4f2963116c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -29,6 +29,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class FlinkTask extends AbstractYarnTask {
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 28443423c2..0ca6a2c39a 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -53,6 +53,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class HiveCliTask extends AbstractRemoteTask {
private HiveCliParameters hiveCliParameters;
@@ -65,9 +68,7 @@ public class HiveCliTask extends AbstractRemoteTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskExecutionContext,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
@Override
@@ -96,7 +97,7 @@ public class HiveCliTask extends AbstractRemoteTask {
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
- setVarPool(shellCommandExecutor.getVarPool());
+ setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current HiveCLI Task has been interrupted", e);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
index 8d696a19b1..d51c99b2a7 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
@@ -52,8 +52,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
import com.fasterxml.jackson.databind.node.ObjectNode;
+@Slf4j
public class HttpTask extends AbstractTask {
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index e9850b6459..5b4811bf8b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -52,8 +52,11 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import lombok.extern.slf4j.Slf4j;
+
import com.google.common.base.Preconditions;
+@Slf4j
public class JavaTask extends AbstractTask {
/**
@@ -79,9 +82,7 @@ public class JavaTask extends AbstractTask {
public JavaTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.taskRequest = taskRequest;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskRequest,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest);
}
/**
@@ -131,7 +132,7 @@ public class JavaTask extends AbstractTask {
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
- setVarPool(shellCommandExecutor.getVarPool());
+ setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
log.error("java task interrupted ", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 735b6237e2..5f79b139a9 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -41,8 +41,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
import com.fasterxml.jackson.databind.ObjectMapper;
+@Slf4j
public class JupyterTask extends AbstractRemoteTask {
private JupyterParameters jupyterParameters;
@@ -54,9 +57,7 @@ public class JupyterTask extends AbstractRemoteTask {
public JupyterTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskExecutionContext,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
@Override
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
index c02adbfa76..fceb29e163 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -43,26 +43,19 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
+@Slf4j
public class K8sTask extends AbstractK8sTask {
- /**
- * taskExecutionContext
- */
private final TaskExecutionContext taskExecutionContext;
- /**
- * task parameters
- */
private K8sTaskParameters k8sTaskParameters;
private K8sTaskExecutionContext k8sTaskExecutionContext;
private K8sConnectionParam k8sConnectionParam;
- /**
- * @param taskRequest taskRequest
- */
public K8sTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.taskExecutionContext = taskRequest;
@@ -119,8 +112,8 @@ public class K8sTask extends AbstractK8sTask {
}
@Override
- protected void dealOutParam(String result) {
- this.k8sTaskParameters.dealOutParam(result);
+ protected void dealOutParam(Map<String, String> taskOutputParams) {
+ this.k8sTaskParameters.dealOutParam(taskOutputParams);
}
public List<NodeSelectorRequirement>
convertToNodeSelectorRequirements(List<NodeSelectorExpression> expressions) {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
index ef21d34e8e..3895190cf2 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.k8s;
-import static
org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -47,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
public class K8sTaskTest {
@@ -167,7 +167,7 @@ public class K8sTaskTest {
@Test
public void testDealOutParam() {
- String result = "key=123" + VAR_DELIMITER;
+ Map<String, String> result = ImmutableMap.of("key", "123");
k8sTask.getParameters().localParams.add(new Property("key",
Direct.OUT, DataType.VARCHAR, "value"));
k8sTask.dealOutParam(result);
k8sTask.getParameters().getVarPool().forEach(property -> {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
index 4ae9a95a5d..0a65cc160c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
@@ -36,6 +36,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class KubeflowTask extends AbstractRemoteTask {
private final TaskExecutionContext taskExecutionContext;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
index c091ee6f64..a4ab0b96a9 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
@@ -41,9 +41,9 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-/**
- * linkis task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class LinkisTask extends AbstractRemoteTask {
/**
@@ -76,9 +76,7 @@ public class LinkisTask extends AbstractRemoteTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskExecutionContext,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
@Override
@@ -107,7 +105,7 @@ public class LinkisTask extends AbstractRemoteTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(findTaskId(commandExecuteResult.getResultString()));
setProcessId(commandExecuteResult.getProcessId());
- linkisParameters.dealOutParam(shellCommandExecutor.getVarPool());
+
linkisParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Linkis task has been interrupted", e);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index 3c1f73747c..bc97be04e6 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -42,9 +42,9 @@ import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-/**
- * shell task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class MlflowTask extends AbstractTask {
private static final Pattern GIT_CHECK_PATTERN =
Pattern.compile("^(git@|https?://)");
@@ -71,7 +71,7 @@ public class MlflowTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext, log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
static public String getPresetRepository() {
@@ -130,7 +130,7 @@ public class MlflowTask extends AbstractTask {
}
setExitStatusCode(exitCode);
setProcessId(commandExecuteResult.getProcessId());
- mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
+
mlflowParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Mlflow task has been interrupted", e);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
index dd51314dfa..0979dc73cd 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
@@ -30,9 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-/**
- * mapreduce task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class MapReduceTask extends AbstractYarnTask {
/**
@@ -41,20 +41,10 @@ public class MapReduceTask extends AbstractYarnTask {
*/
private static final String MAPREDUCE_COMMAND = TaskConstants.HADOOP;
- /**
- * mapreduce parameters
- */
private MapReduceParameters mapreduceParameters;
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ private final TaskExecutionContext taskExecutionContext;
- /**
- * constructor
- * @param taskExecutionContext taskExecutionContext
- */
public MapReduceTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
index 9142a364a1..6f97d0867c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
@@ -32,11 +32,11 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import lombok.extern.slf4j.Slf4j;
+
import com.google.common.base.Preconditions;
-/**
- * openmldb task
- */
+@Slf4j
public class OpenmldbTask extends PythonTask {
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
index 8bab12133e..55af378e83 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
@@ -46,12 +46,15 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
/**
* TIS DataX Task
**/
+@Slf4j
public class PigeonTask extends AbstractRemoteTask {
public static final String KEY_POOL_VAR_PIGEON_HOST = "p_host";
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 2bdbcd5e23..6a66ba1965 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -46,24 +46,18 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
import com.google.common.collect.Maps;
-/**
- * procedure task
- */
+@Slf4j
public class ProcedureTask extends AbstractTask {
- /**
- * procedure parameters
- */
- private ProcedureParameters procedureParameters;
+ private final ProcedureParameters procedureParameters;
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ private final TaskExecutionContext taskExecutionContext;
- private ProcedureTaskExecutionContext procedureTaskExecutionContext;
+ private final ProcedureTaskExecutionContext procedureTaskExecutionContext;
/**
* constructor
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 3fd938d3a5..539ecd7240 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -40,21 +40,15 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
import com.google.common.base.Preconditions;
-/**
- * python task
- */
+@Slf4j
public class PythonTask extends AbstractTask {
- /**
- * python parameters
- */
protected PythonParameters pythonParameters;
- /**
- * shell command executor
- */
private ShellCommandExecutor shellCommandExecutor;
protected TaskExecutionContext taskRequest;
@@ -70,9 +64,7 @@ public class PythonTask extends AbstractTask {
super(taskRequest);
this.taskRequest = taskRequest;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskRequest,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest);
}
@Override
@@ -104,8 +96,8 @@ public class PythonTask extends AbstractTask {
TaskResponse taskResponse =
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
- setVarPool(shellCommandExecutor.getVarPool());
- pythonParameters.dealOutParam(shellCommandExecutor.getVarPool());
+ setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
+
pythonParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (Exception e) {
log.error("python task failure", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index ffefc83a8e..1d7a874dfc 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -34,6 +34,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class PytorchTask extends AbstractTask {
private final ShellCommandExecutor shellCommandExecutor;
@@ -45,9 +48,7 @@ public class PytorchTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskExecutionContext,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
@Override
@@ -76,7 +77,7 @@ public class PytorchTask extends AbstractTask {
TaskResponse taskResponse =
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
- setVarPool(shellCommandExecutor.getVarPool());
+ setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Pytorch task has been interrupted", e);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
index 650bf84a69..4de28b7fb5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.remoteshell;
import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils;
import
org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
@@ -36,24 +36,19 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.EnumSet;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.HashMap;
+import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class RemoteExecutor {
- protected final Logger logger =
-
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME,
getClass()));
-
- protected static final Pattern SETVALUE_REGEX =
Pattern.compile(TaskConstants.SETVALUE_REGEX);
-
static final String REMOTE_SHELL_HOME =
"/tmp/dolphinscheduler-remote-shell-%s/";
static final String STATUS_TAG_MESSAGE =
"DOLPHINSCHEDULER-REMOTE-SHELL-TASK-STATUS-";
static final int TRACK_INTERVAL = 5000;
- protected StringBuilder varPool = new StringBuilder();
+ protected Map<String, String> taskOutputParams = new HashMap<>();
SshClient sshClient;
ClientSession session;
@@ -105,60 +100,44 @@ public class RemoteExecutor {
public void track(String taskId) throws Exception {
int logN = 0;
String pid;
- logger.info("Remote shell task log:");
+ log.info("Remote shell task log:");
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
do {
pid = getTaskPid(taskId);
String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN +
1, getRemoteShellHome(), taskId);
- String log = runRemote(trackCommand);
- if (StringUtils.isEmpty(log)) {
+ String logLine = runRemote(trackCommand);
+ if (StringUtils.isEmpty(logLine)) {
Thread.sleep(TRACK_INTERVAL);
} else {
- logN += log.split("\n").length;
- setVarPool(log);
- logger.info(log);
+ logN += logLine.split("\n").length;
+ log.info(logLine);
+ taskOutputParameterParser.appendParseLog(logLine);
}
} while (StringUtils.isNotEmpty(pid));
+
taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
}
- public String getVarPool() {
- return varPool.toString();
- }
-
- private void setVarPool(String log) {
- String[] lines = log.split("\n");
- for (String line : lines) {
- if (line.startsWith("${setValue(") ||
line.startsWith("#{setValue(")) {
- varPool.append(findVarPool(line));
- varPool.append("$VarPool$");
- }
- }
- }
-
- private String findVarPool(String line) {
- Matcher matcher = SETVALUE_REGEX.matcher(line);
- if (matcher.find()) {
- return matcher.group(1);
- }
- return null;
+ public Map<String, String> getTaskOutputParams() {
+ return taskOutputParams;
}
public Integer getTaskExitCode(String taskId) throws IOException {
String trackCommand = String.format(COMMAND.LOG_TAIL_COMMAND,
getRemoteShellHome(), taskId);
- String log = runRemote(trackCommand);
+ String logLine = runRemote(trackCommand);
int exitCode = -1;
- logger.info("Remote shell task run status: {}", log);
- if (log.contains(STATUS_TAG_MESSAGE)) {
- String status = log.replace(STATUS_TAG_MESSAGE, "").trim();
+ log.info("Remote shell task run status: {}", logLine);
+ if (logLine.contains(STATUS_TAG_MESSAGE)) {
+ String status = logLine.replace(STATUS_TAG_MESSAGE, "").trim();
if (status.equals("0")) {
- logger.info("Remote shell task success");
+ log.info("Remote shell task success");
exitCode = 0;
} else {
- logger.error("Remote shell task failed");
+ log.error("Remote shell task failed");
exitCode = Integer.parseInt(status);
}
}
cleanData(taskId);
- logger.error("Remote shell task failed");
+ log.error("Remote shell task failed");
return exitCode;
}
@@ -168,7 +147,7 @@ public class RemoteExecutor {
try {
runRemote(cleanCommand);
} catch (Exception e) {
- logger.error("Remote shell task clean data failed, but will not
affect the task execution", e);
+ log.error("Remote shell task clean data failed, but will not
affect the task execution", e);
}
}
@@ -189,14 +168,14 @@ public class RemoteExecutor {
runRemote(checkDirCommand);
uploadScript(taskId, localFile);
- logger.info("The final script is: \n{}",
+ log.info("The final script is: \n{}",
runRemote(String.format(COMMAND.CAT_FINAL_SCRIPT,
getRemoteShellHome(), taskId)));
}
public void uploadScript(String taskId, String localFile) throws
IOException {
String remotePath = getRemoteShellHome() + taskId + ".sh";
- logger.info("upload script from local:{} to remote: {}", localFile,
remotePath);
+ log.info("upload script from local:{} to remote: {}", localFile,
remotePath);
try (SftpFileSystem fs =
SftpClientFactory.instance().createSftpFileSystem(getSession())) {
Path path = fs.getPath(remotePath);
Files.copy(Paths.get(localFile), path);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
index 216203bfea..561392896c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
@@ -42,9 +42,9 @@ import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
-/**
- * shell task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class RemoteShellTask extends AbstractTask {
static final String TASK_ID_PREFIX = "dolphinscheduler-remoteshell-";
@@ -102,7 +102,7 @@ public class RemoteShellTask extends AbstractTask {
String localFile = buildCommand();
int exitCode = remoteExecutor.run(taskId, localFile);
setExitStatusCode(exitCode);
- remoteShellParameters.dealOutParam(remoteExecutor.getVarPool());
+
remoteShellParameters.dealOutParam(remoteExecutor.getTaskOutputParams());
} catch (Exception e) {
log.error("shell task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
index d04f5a3fac..b1b2cc811f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -39,6 +39,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@@ -52,6 +54,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper;
/**
* SagemakerTask task, Used to start Sagemaker pipeline
*/
+@Slf4j
public class SagemakerTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper = JsonMapper.builder()
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index 0b837fa568..b6d0b6136e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -47,9 +47,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-/**
- * seatunnel task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class SeatunnelTask extends AbstractRemoteTask {
private static final String SEATUNNEL_BIN_DIR = "${SEATUNNEL_HOME}/bin/";
@@ -78,9 +78,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskExecutionContext,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
@Override
@@ -109,7 +107,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
-
seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
+
seatunnelParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current SeaTunnel task has been interrupted", e);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index b657f9a9ef..71cfb9522c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -31,9 +31,9 @@ import
org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilde
import
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
-/**
- * shell task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class ShellTask extends AbstractTask {
/**
@@ -60,9 +60,7 @@ public class ShellTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskExecutionContext,
- log);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
}
@Override
@@ -87,7 +85,7 @@ public class ShellTask extends AbstractTask {
TaskResponse commandExecuteResult =
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
- shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
+
shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Shell task has been interrupted", e);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index ef0b71c8fe..99a9d9e61b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -53,19 +53,15 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.client.Config;
+@Slf4j
public class SparkTask extends AbstractYarnTask {
- /**
- * spark parameters
- */
private SparkParameters sparkParameters;
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ private final TaskExecutionContext taskExecutionContext;
public SparkTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 091dfc5794..77886db59c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -61,24 +61,18 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+@Slf4j
public class SqlTask extends AbstractTask {
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ private final TaskExecutionContext taskExecutionContext;
- /**
- * sql parameters
- */
- private SqlParameters sqlParameters;
+ private final SqlParameters sqlParameters;
- /**
- * base datasource
- */
private BaseConnectionParam baseConnectionParam;
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
index 016a07a56b..86aea61f0c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
@@ -29,19 +29,16 @@ import
org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
/**
* sqoop task extends the shell task
*/
+@Slf4j
public class SqoopTask extends AbstractYarnTask {
- /**
- * sqoop task params
- */
private SqoopParameters sqoopParameters;
- /**
- * taskExecutionContext
- */
private final TaskExecutionContext taskExecutionContext;
private SqoopTaskExecutionContext sqoopTaskExecutionContext;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index 01459111b5..c3b580de35 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -42,19 +42,15 @@ import java.util.List;
import java.util.Map;
import kong.unirest.Unirest;
+import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.ObjectMapper;
+@Slf4j
public class ZeppelinTask extends AbstractRemoteTask {
- /**
- * taskExecutionContext
- */
private final TaskExecutionContext taskExecutionContext;
- /**
- * zeppelin parameters
- */
private ZeppelinParameters zeppelinParameters;
/**
@@ -66,11 +62,6 @@ public class ZeppelinTask extends AbstractRemoteTask {
private ZeppelinTaskExecutionContext zeppelinTaskExecutionContext;
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- */
protected ZeppelinTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;