This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 26fe6fe53c [Fix-17780][TaskPlugin] Fix shell output log might loss due
to incorrect usage of log buffer (#17790)
26fe6fe53c is described below
commit 26fe6fe53cd67aacbb0744bf044b2699630f5e16
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Dec 22 12:26:03 2025 +0800
[Fix-17780][TaskPlugin] Fix shell output log might loss due to incorrect
usage of log buffer (#17790)
---
docs/docs/en/guide/upgrade/incompatible.md | 1 +
.../plugin/task/api/AbstractCommandExecutor.java | 121 +++++----------------
.../plugin/task/api/AbstractTask.java | 16 ---
.../plugin/task/api/AbstractYarnTask.java | 4 +-
.../plugin/task/api/ShellCommandExecutor.java | 8 +-
.../plugin/task/chunjun/ChunJunTask.java | 28 +----
.../plugin/task/datax/DataxTask.java | 31 +-----
.../dolphinscheduler/plugin/task/dvc/DvcTask.java | 26 +----
.../plugin/task/hivecli/HiveCliTask.java | 18 ++-
.../plugin/task/java/JavaTask.java | 15 +--
.../plugin/task/jupyter/JupyterTask.java | 9 +-
.../plugin/task/linkis/LinkisTask.java | 26 +----
.../plugin/task/mlflow/MlflowTask.java | 23 +---
.../plugin/task/python/PythonTask.java | 12 +-
.../plugin/task/pytorch/PytorchTask.java | 12 +-
.../plugin/task/seatunnel/SeatunnelTask.java | 39 ++-----
.../task/seatunnel/flink/SeatunnelFlinkTask.java | 2 +-
.../task/seatunnel/self/SeatunnelEngineTask.java | 2 +-
.../task/seatunnel/spark/SeatunnelSparkTask.java | 2 +-
.../plugin/task/shell/ShellTask.java | 29 +----
20 files changed, 94 insertions(+), 330 deletions(-)
diff --git a/docs/docs/en/guide/upgrade/incompatible.md
b/docs/docs/en/guide/upgrade/incompatible.md
index e30058aef9..508ba65bc5 100644
--- a/docs/docs/en/guide/upgrade/incompatible.md
+++ b/docs/docs/en/guide/upgrade/incompatible.md
@@ -37,4 +37,5 @@ This document records the incompatible updates between each
version. You need to
* Renamed the publicKey field to privateKey in the SSH connection parameters
under the datasource configuration.
([#17666])(https://github.com/apache/dolphinscheduler/pull/17666)
* Add table t_ds_serial_command.
([#17531])(https://github.com/apache/dolphinscheduler/pull/17531)
* Remove the default value of `python-gateway.auth-token` at
`api-server/application.yaml`.
([#17801])(https://github.com/apache/dolphinscheduler/pull/17801)
+* Refactor the task plugins which use ShellCommandExecutor
([#17790])(https://github.com/apache/dolphinscheduler/pull/17790)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 6e891298dd..86199c874e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.api;
-import static
org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
import static
org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_HARD_KILL;
@@ -42,12 +41,10 @@ import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.client.dsl.LogWatch;
@@ -59,41 +56,12 @@ import io.fabric8.kubernetes.client.dsl.LogWatch;
public abstract class AbstractCommandExecutor {
protected volatile Map<String, String> taskOutputParams = new HashMap<>();
- /**
- * process
- */
private Process process;
- /**
- * log handler
- */
- protected Consumer<LinkedBlockingQueue<String>> logHandler;
-
- /**
- * log list
- */
- protected LinkedBlockingQueue<String> logBuffer;
-
- protected boolean processLogOutputIsSuccess = false;
-
- protected boolean podLogOutputIsFinished = false;
-
- /**
- * taskRequest
- */
protected TaskExecutionContext taskRequest;
- protected Future<?> taskOutputFuture;
-
- protected Future<?> podLogOutputFuture;
-
- public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>>
logHandler,
- TaskExecutionContext taskRequest) {
- this.logHandler = logHandler;
+ public AbstractCommandExecutor(TaskExecutionContext taskRequest) {
this.taskRequest = taskRequest;
- this.logBuffer = new LinkedBlockingQueue<>();
- this.logBuffer.add(EMPTY_STRING);
-
}
// todo: We need to build the IShellActuator in outer class, since
different task may have specific logic to build
@@ -135,10 +103,10 @@ public abstract class AbstractCommandExecutor {
process = iShellInterceptor.execute();
// parse process output
- parseProcessOutput(this.process);
+ final CompletableFuture<Void> collectProcessLogFuture =
collectProcessLog(this.process);
// collect pod log
- collectPodLogIfNeeded();
+ final Optional<CompletableFuture<?>> collectPodLogFuture =
collectPodLogIfNeeded();
int processId = getProcessId(this.process);
@@ -164,24 +132,14 @@ public abstract class AbstractCommandExecutor {
TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(),
taskRequest.getTaskAppId());
- if (taskOutputFuture != null) {
- try {
- // Wait the task log process finished.
- taskOutputFuture.get();
- } catch (ExecutionException e) {
- log.error("Handle task log error", e);
- }
- }
+ // Wait the task log process finished.
+ collectProcessLogFuture.join();
- if (podLogOutputFuture != null) {
- try {
- // Wait kubernetes pod log collection finished
- podLogOutputFuture.get();
- // delete pod after successful execution and log collection
- ProcessUtils.cancelApplication(taskRequest);
- } catch (ExecutionException e) {
- log.error("Handle pod log error", e);
- }
+ if (collectPodLogFuture.isPresent()) {
+ // Wait kubernetes pod log collection finished
+ collectPodLogFuture.get().join();
+ // delete pod after successful execution and log collection
+ ProcessUtils.cancelApplication(taskRequest);
}
// if SHELL task exit
@@ -227,16 +185,15 @@ public abstract class AbstractCommandExecutor {
ProcessUtils.cancelApplication(taskRequest);
}
- private void collectPodLogIfNeeded() {
+ private Optional<CompletableFuture<?>> collectPodLogIfNeeded() {
if (null == taskRequest.getK8sTaskExecutionContext()) {
- podLogOutputIsFinished = true;
- return;
+ return Optional.empty();
}
ExecutorService collectPodLogExecutorService = ThreadUtils
.newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" +
taskRequest.getTaskName());
- podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
+ final CompletableFuture<Void> collectPodLogFuture =
CompletableFuture.runAsync(() -> {
// wait for launching (driver) pod
ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L);
try (
@@ -248,67 +205,43 @@ public abstract class AbstractCommandExecutor {
String line;
try (BufferedReader reader = new BufferedReader(new
InputStreamReader(watcher.getOutput()))) {
while ((line = reader.readLine()) != null) {
- logBuffer.add(String.format("[K8S-pod-log-%s]:
%s", taskRequest.getTaskName(), line));
+ log.info("[K8S-pod-log-{}]: {}",
taskRequest.getTaskName(), line);
}
}
}
} catch (Exception e) {
+ log.error("Collect pod log error", e);
throw new RuntimeException(e);
- } finally {
- podLogOutputIsFinished = true;
}
-
- });
+ }, collectPodLogExecutorService);
collectPodLogExecutorService.shutdown();
+ return Optional.of(collectPodLogFuture);
}
- private void parseProcessOutput(Process process) {
+ private CompletableFuture<Void> collectProcessLog(Process process) {
// todo: remove this this thread pool.
- ExecutorService getOutputLogService = ThreadUtils
-
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" +
taskRequest.getTaskName());
- getOutputLogService.execute(() -> {
+ final ExecutorService collectProcessLogService =
ThreadUtils.newSingleDaemonScheduledExecutorService(
+ "ResolveOutputLog-thread-" + taskRequest.getTaskName());
+ final CompletableFuture<Void> collectProcessLogFuture =
CompletableFuture.runAsync(() -> {
TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
try (BufferedReader inReader = new BufferedReader(new
InputStreamReader(process.getInputStream()))) {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
String line;
while ((line = inReader.readLine()) != null) {
- logBuffer.add(line);
+ log.info(" -> {}", line);
taskOutputParameterParser.appendParseLog(line);
}
- processLogOutputIsSuccess = true;
} catch (Exception e) {
log.error("Parse var pool error", e);
- processLogOutputIsSuccess = true;
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
- });
-
- getOutputLogService.shutdown();
+ }, collectProcessLogService);
- ExecutorService parseProcessOutputExecutorService = ThreadUtils
-
.newSingleDaemonScheduledExecutorService("TaskInstanceLogOutput-thread-" +
taskRequest.getTaskName());
- taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
- try {
-
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
- while (logBuffer.size() > 1 || !processLogOutputIsSuccess ||
!podLogOutputIsFinished) {
- if (logBuffer.size() > 1) {
- logHandler.accept(logBuffer);
- logBuffer.clear();
- logBuffer.add(EMPTY_STRING);
- } else {
- Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
- }
- }
- } catch (Exception e) {
- log.error("Output task log error", e);
- } finally {
- LogUtils.removeTaskInstanceLogFullPathMDC();
- }
- });
- parseProcessOutputExecutorService.shutdown();
+ collectProcessLogService.shutdown();
+ return collectProcessLogFuture;
}
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
index afb0752df7..006f40aa14 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
@@ -23,8 +23,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import lombok.Getter;
@@ -149,20 +147,6 @@ public abstract class AbstractTask {
return TaskExecutionStatus.FAILURE;
}
- /**
- * log handle
- *
- * @param logs log list
- */
- public void logHandle(LinkedBlockingQueue<String> logs) {
-
- StringJoiner joiner = new StringJoiner("\n\t");
- while (!logs.isEmpty()) {
- joiner.add(logs.poll());
- }
- log.info(" -> {}", joiner);
- }
-
/**
* regular expressions match the contents between two specified strings
*
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index d4b98afd67..d58ba941f5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -34,11 +34,11 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractYarnTask extends AbstractRemoteTask {
- private ShellCommandExecutor shellCommandExecutor;
+ private final ShellCommandExecutor shellCommandExecutor;
public AbstractYarnTask(TaskExecutionContext taskRequest) {
super(taskRequest);
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest);
+ this.shellCommandExecutor = new ShellCommandExecutor(taskRequest);
}
// todo split handle to submit and track
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index c9834adb86..3cea54ec61 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -17,14 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.api;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
-
public class ShellCommandExecutor extends AbstractCommandExecutor {
- public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>>
logHandler,
- TaskExecutionContext taskRequest) {
- super(logHandler, taskRequest);
+ public ShellCommandExecutor(TaskExecutionContext taskRequest) {
+ super(taskRequest);
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index e3814a076d..28c4585372 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -60,26 +60,13 @@ public class ChunJunTask extends AbstractTask {
*/
private static final String CHUNJUN_DIST_DIR =
"${CHUNJUN_HOME}/chunjun-dist";
- /**
- * chunJun parameters
- */
private ChunJunParameters chunJunParameters;
- /**
- * shell command executor
- */
- private ShellCommandExecutor shellCommandExecutor;
-
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ private final ShellCommandExecutor shellCommandExecutor;
public ChunJunTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
-
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
/**
@@ -87,9 +74,8 @@ public class ChunJunTask extends AbstractTask {
*/
@Override
public void init() {
- chunJunParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ChunJunParameters.class);
- log.info("Initialize chunjun task params {}",
-
JSONUtils.toPrettyJsonString(taskExecutionContext.getTaskParams()));
+ chunJunParameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
ChunJunParameters.class);
+ log.info("Initialize chunjun task params {}",
JSONUtils.toPrettyJsonString(taskRequest.getTaskParams()));
if (!chunJunParameters.checkParameters()) {
throw new RuntimeException("chunjun task params is not valid");
@@ -100,7 +86,7 @@ public class ChunJunTask extends AbstractTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
- Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
+ Map<String, Property> paramsMap =
taskRequest.getPrepareParamsMap();
IShellInterceptorBuilder<?, ?> shellActuatorBuilder =
ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(paramsMap))
@@ -133,9 +119,7 @@ public class ChunJunTask extends AbstractTask {
*/
private String buildChunJunJsonFile(Map<String, Property> paramsMap)
throws Exception {
// generate json
- String fileName = String.format("%s/%s_job.json",
- taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTaskAppId());
+ String fileName = String.format("%s/%s_job.json",
taskRequest.getExecutePath(), taskRequest.getTaskAppId());
String json = null;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index d8aadffe96..fd173eb4ef 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -103,33 +103,16 @@ public class DataxTask extends AbstractTask {
*/
private static final int DATAX_CHANNEL_COUNT = 1;
- /**
- * datax parameters
- */
private DataxParameters dataXParameters;
- /**
- * shell command executor
- */
- private ShellCommandExecutor shellCommandExecutor;
-
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ private final ShellCommandExecutor shellCommandExecutor;
private DataxTaskExecutionContext dataxTaskExecutionContext;
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- */
public DataxTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
/**
@@ -137,7 +120,7 @@ public class DataxTask extends AbstractTask {
*/
@Override
public void init() {
- dataXParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
DataxParameters.class);
+ dataXParameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
DataxParameters.class);
log.info("Initialize datax task params {}",
JSONUtils.toPrettyJsonString(dataXParameters));
if (dataXParameters == null || !dataXParameters.checkParameters()) {
@@ -145,7 +128,7 @@ public class DataxTask extends AbstractTask {
}
SensitiveDataConverter.addMaskPattern(POST_JDBC_INFO_REGEX);
dataxTaskExecutionContext =
-
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+
dataXParameters.generateExtendedContext(taskRequest.getResourceParametersHelper());
}
@SuppressWarnings("unchecked")
@@ -153,7 +136,7 @@ public class DataxTask extends AbstractTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// replace placeholder,and combine local and global parameters
- Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
+ Map<String, Property> paramsMap =
taskRequest.getPrepareParamsMap();
IShellInterceptorBuilder<?, ?> shellActuatorBuilder =
ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(paramsMap))
@@ -198,9 +181,7 @@ public class DataxTask extends AbstractTask {
*/
private String buildDataxJsonFile(Map<String, Property> paramsMap) throws
Exception {
// generate json
- String fileName = String.format("%s/%s_job.json",
- taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTaskAppId());
+ String fileName = String.format("%s/%s_job.json",
taskRequest.getExecutePath(), taskRequest.getTaskAppId());
String json;
Path path = new File(fileName).toPath();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index d1a8bf8b1b..1086563606 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -38,37 +38,19 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DvcTask extends AbstractTask {
- /**
- * dvc parameters
- */
private DvcParameters parameters;
- /**
- * shell command executor
- */
- private ShellCommandExecutor shellCommandExecutor;
-
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
-
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- */
+ private final ShellCommandExecutor shellCommandExecutor;
+
public DvcTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
-
- this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
@Override
public void init() {
- parameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
DvcParameters.class);
+ parameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
DvcParameters.class);
log.info("Initialize dvc task params {}",
JSONUtils.toPrettyJsonString(parameters));
if (parameters == null || !parameters.checkParameters()) {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 6630bfb8b7..94e562cff1 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -63,13 +63,9 @@ public class HiveCliTask extends AbstractRemoteTask {
private final ShellCommandExecutor shellCommandExecutor;
- private final TaskExecutionContext taskExecutionContext;
-
public HiveCliTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
-
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
@Override
@@ -79,9 +75,9 @@ public class HiveCliTask extends AbstractRemoteTask {
@Override
public void init() {
- log.info("hiveCli task params {}",
taskExecutionContext.getTaskParams());
+ log.info("hiveCli task params {}", taskRequest.getTaskParams());
- hiveCliParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
HiveCliParameters.class);
+ hiveCliParameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
HiveCliParameters.class);
if (!hiveCliParameters.checkParameters()) {
throw new TaskException("hiveCli task params is not valid");
@@ -138,7 +134,7 @@ public class HiveCliTask extends AbstractRemoteTask {
try {
resourceFileName = resourceInfos.get(0).getResourceName();
- ResourceContext resourceContext =
taskExecutionContext.getResourceContext();
+ ResourceContext resourceContext =
taskRequest.getResourceContext();
sqlContent = FileUtils.readFileToString(
new
File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
StandardCharsets.UTF_8);
@@ -150,7 +146,7 @@ public class HiveCliTask extends AbstractRemoteTask {
sqlContent = hiveCliParameters.getHiveSqlScript();
}
- final Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
+ final Map<String, Property> paramsMap =
taskRequest.getPrepareParamsMap();
sqlContent = ParameterUtils.convertParameterPlaceholders(sqlContent,
ParameterUtils.convert(paramsMap));
log.info("HiveCli sql content: {}", sqlContent);
String sqlFilePath = generateSqlScriptFile(sqlContent);
@@ -184,8 +180,8 @@ public class HiveCliTask extends AbstractRemoteTask {
}
protected String generateSqlScriptFile(String rawScript) {
- String scriptFileName = String.format("%s/%s_node.sql",
taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTaskAppId());
+ String scriptFileName = String.format("%s/%s_node.sql",
taskRequest.getExecutePath(),
+ taskRequest.getTaskAppId());
File file = new File(scriptFileName);
Path path = file.toPath();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index ab32c87dd5..5e958026ca 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -46,25 +46,14 @@ import com.google.common.base.Preconditions;
@Slf4j
public class JavaTask extends AbstractTask {
- /**
- * Contains various parameters for this task
- */
private JavaParameters javaParameters;
- /**
- * To run shell commands
- */
- private ShellCommandExecutor shellCommandExecutor;
-
- /**
- * task execution context
- */
- private TaskExecutionContext taskRequest;
+ private final ShellCommandExecutor shellCommandExecutor;
public JavaTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.taskRequest = taskRequest;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest);
+ this.shellCommandExecutor = new ShellCommandExecutor(taskRequest);
}
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 5f79b139a9..f55da63f32 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -50,14 +50,11 @@ public class JupyterTask extends AbstractRemoteTask {
private JupyterParameters jupyterParameters;
- private TaskExecutionContext taskExecutionContext;
-
private ShellCommandExecutor shellCommandExecutor;
public JupyterTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
@Override
@@ -68,7 +65,7 @@ public class JupyterTask extends AbstractRemoteTask {
@Override
public void init() {
- jupyterParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
JupyterParameters.class);
+ jupyterParameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
JupyterParameters.class);
log.info("Initialize jupyter task params {}",
JSONUtils.toPrettyJsonString(jupyterParameters));
if (null == jupyterParameters) {
@@ -86,7 +83,7 @@ public class JupyterTask extends AbstractRemoteTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
IShellInterceptorBuilder<?, ?> shellActuatorBuilder =
ShellInterceptorBuilderFactory.newBuilder()
-
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
.appendScript(buildCommand());
TaskResponse response =
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
index a4ab0b96a9..8f4a346abd 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
@@ -46,20 +46,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LinkisTask extends AbstractRemoteTask {
- /**
- * linkis parameters
- */
private LinkisParameters linkisParameters;
- /**
- * shell command executor
- */
- private ShellCommandExecutor shellCommandExecutor;
-
- /**
- * taskExecutionContext
- */
- protected final TaskExecutionContext taskExecutionContext;
+ private final ShellCommandExecutor shellCommandExecutor;
private String taskId;
@@ -67,16 +56,9 @@ public class LinkisTask extends AbstractRemoteTask {
protected static final Pattern LINKIS_STATUS_REGEX =
Pattern.compile(Constants.LINKIS_STATUS_REGEX);
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- */
public LinkisTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
-
- this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
@Override
@@ -86,7 +68,7 @@ public class LinkisTask extends AbstractRemoteTask {
@Override
public void init() {
- linkisParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
LinkisParameters.class);
+ linkisParameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
LinkisParameters.class);
log.info("Initialize Linkis task params {}",
JSONUtils.toPrettyJsonString(linkisParameters));
if (!linkisParameters.checkParameters()) {
@@ -99,7 +81,7 @@ public class LinkisTask extends AbstractRemoteTask {
try {
// construct process
IShellInterceptorBuilder<?, ?> shellActuatorBuilder =
ShellInterceptorBuilderFactory.newBuilder()
-
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
.appendScript(buildCommand());
TaskResponse commandExecuteResult =
shellCommandExecutor.run(shellActuatorBuilder, null);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index bc97be04e6..332879d6ce 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -48,30 +48,13 @@ import lombok.extern.slf4j.Slf4j;
public class MlflowTask extends AbstractTask {
private static final Pattern GIT_CHECK_PATTERN =
Pattern.compile("^(git@|https?://)");
- /**
- * shell command executor
- */
private final ShellCommandExecutor shellCommandExecutor;
- /**
- * taskExecutionContext
- */
- private final TaskExecutionContext taskExecutionContext;
- /**
- * shell parameters
- */
private MlflowParameters mlflowParameters;
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- */
public MlflowTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
-
- this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
static public String getPresetRepository() {
@@ -105,7 +88,7 @@ public class MlflowTask extends AbstractTask {
@Override
public void init() {
- mlflowParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
MlflowParameters.class);
+ mlflowParameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
MlflowParameters.class);
log.info("Initialize MLFlow task params {}",
JSONUtils.toPrettyJsonString(mlflowParameters));
if (mlflowParameters == null || !mlflowParameters.checkParameters()) {
@@ -254,7 +237,7 @@ public class MlflowTask extends AbstractTask {
}
private Map<String, Property> getParamsMap() {
- return taskExecutionContext.getPrepareParamsMap();
+ return taskRequest.getPrepareParamsMap();
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 715438f6e6..dc4381adf0 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -49,22 +49,14 @@ public class PythonTask extends AbstractTask {
protected PythonParameters pythonParameters;
- private ShellCommandExecutor shellCommandExecutor;
-
- protected TaskExecutionContext taskRequest;
+ private final ShellCommandExecutor shellCommandExecutor;
protected static final String PYTHON_LAUNCHER = "PYTHON_LAUNCHER";
- /**
- * constructor
- *
- * @param taskRequest taskRequest
- */
public PythonTask(TaskExecutionContext taskRequest) {
super(taskRequest);
- this.taskRequest = taskRequest;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest);
+ this.shellCommandExecutor = new ShellCommandExecutor(taskRequest);
}
@Override
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index 1d7a874dfc..14422ba4da 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -41,21 +41,19 @@ public class PytorchTask extends AbstractTask {
private final ShellCommandExecutor shellCommandExecutor;
protected PytorchParameters pytorchParameters;
- protected TaskExecutionContext taskExecutionContext;
private PythonEnvManager pythonEnvManager;
public PytorchTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
@Override
public void init() {
- pytorchParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
PytorchParameters.class);
- log.info("Initialize pytorch task params {}",
JSONUtils.toPrettyJsonString(taskExecutionContext));
+ pytorchParameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
PytorchParameters.class);
+ log.info("Initialize pytorch task params {}",
JSONUtils.toPrettyJsonString(taskRequest));
if (pytorchParameters == null || !pytorchParameters.checkParameters())
{
throw new TaskException("python task params is not valid");
@@ -71,7 +69,7 @@ public class PytorchTask extends AbstractTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
IShellInterceptorBuilder<?, ?> shellActuatorBuilder =
ShellInterceptorBuilderFactory.newBuilder()
-
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
.appendScript(buildPythonExecuteCommand());
TaskResponse taskResponse =
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
@@ -102,7 +100,7 @@ public class PytorchTask extends AbstractTask {
if (GitProjectManager.isGitPath(pythonPath)) {
GitProjectManager gpm = new GitProjectManager();
gpm.setPath(pythonPath);
- gpm.setBaseDir(taskExecutionContext.getExecutePath());
+ gpm.setBaseDir(taskRequest.getExecutePath());
gpm.prepareProject();
pytorchParameters.setPythonPath(gpm.getGitLocalPath());
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index da9efbfec0..c0b37057ea 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -57,31 +57,14 @@ public class SeatunnelTask extends AbstractRemoteTask {
private static final String SEATUNNEL_BIN_DIR = "${SEATUNNEL_HOME}/bin/";
- /**
- * seatunnel parameters
- */
private SeatunnelParameters seatunnelParameters;
- /**
- * shell command executor
- */
- private ShellCommandExecutor shellCommandExecutor;
-
- /**
- * taskExecutionContext
- */
- protected final TaskExecutionContext taskExecutionContext;
-
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- */
+ private final ShellCommandExecutor shellCommandExecutor;
+
public SeatunnelTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
@Override
@@ -163,7 +146,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
scriptContent = buildCustomConfigContent();
} else {
String resourceFileName =
seatunnelParameters.getResourceList().get(0).getResourceName();
- ResourceContext resourceContext =
taskExecutionContext.getResourceContext();
+ ResourceContext resourceContext = taskRequest.getResourceContext();
scriptContent = FileUtils.readFileToString(
new
File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
StandardCharsets.UTF_8);
@@ -177,8 +160,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
private List<String> generateTaskParameters() {
Map<String, String> variables = new HashMap<>();
- Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
- List<Property> propertyList =
JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
+ Map<String, Property> paramsMap = taskRequest.getPrepareParamsMap();
+ List<Property> propertyList =
JSONUtils.toList(taskRequest.getGlobalParams(), Property.class);
if (propertyList != null && !propertyList.isEmpty()) {
for (Property property : propertyList) {
variables.put(property.getProp(),
paramsMap.get(property.getProp()).getValue());
@@ -208,8 +191,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
}
private String buildConfigFilePath() {
- return String.format("%s/seatunnel_%s.%s",
taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTaskAppId(), formatDetector());
+ return String.format("%s/seatunnel_%s.%s",
taskRequest.getExecutePath(),
+ taskRequest.getTaskAppId(), formatDetector());
}
private String formatDetector() {
@@ -218,8 +201,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
}
private void createConfigFileIfNotExists(String script, String scriptFile)
throws IOException {
- log.info("tenantCode :{}, task dir:{}",
taskExecutionContext.getTenantCode(),
- taskExecutionContext.getExecutePath());
+ log.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(),
+ taskRequest.getExecutePath());
if (!Files.exists(Paths.get(scriptFile))) {
log.info("generate script file:{}", scriptFile);
@@ -235,7 +218,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
}
private String parseScript(String script) {
- Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
+ Map<String, Property> paramsMap = taskRequest.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script,
ParameterUtils.convert(paramsMap));
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
index 7918c6d183..492b592f70 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
@@ -36,7 +36,7 @@ public class SeatunnelFlinkTask extends SeatunnelTask {
@Override
public void init() {
seatunnelParameters =
- JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
SeatunnelFlinkParameters.class);
+ JSONUtils.parseObject(taskRequest.getTaskParams(),
SeatunnelFlinkParameters.class);
setSeatunnelParameters(seatunnelParameters);
super.init();
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java
index 1507653da9..f870dfdcd9 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java
@@ -37,7 +37,7 @@ public class SeatunnelEngineTask extends SeatunnelTask {
@Override
public void init() {
seatunnelParameters =
- JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
SeatunnelEngineParameters.class);
+ JSONUtils.parseObject(taskRequest.getTaskParams(),
SeatunnelEngineParameters.class);
setSeatunnelParameters(seatunnelParameters);
super.init();
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
index d8b1a69156..cb1e6aeb3e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
@@ -43,7 +43,7 @@ public class SeatunnelSparkTask extends SeatunnelTask {
@Override
public void init() {
seatunnelParameters =
- JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
SeatunnelSparkParameters.class);
+ JSONUtils.parseObject(taskRequest.getTaskParams(),
SeatunnelSparkParameters.class);
setSeatunnelParameters(seatunnelParameters);
super.init();
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 0d90ab998b..ccb8efb492 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -36,37 +36,20 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ShellTask extends AbstractTask {
- /**
- * shell parameters
- */
private ShellParameters shellParameters;
- /**
- * shell command executor
- */
- private ShellCommandExecutor shellCommandExecutor;
-
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
-
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- */
+ private final ShellCommandExecutor shellCommandExecutor;
+
public ShellTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext);
+ this.shellCommandExecutor = new
ShellCommandExecutor(taskExecutionContext);
}
@Override
public void init() {
- shellParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ShellParameters.class);
+ shellParameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
ShellParameters.class);
log.info("Initialize shell task params {}",
JSONUtils.toPrettyJsonString(shellParameters));
if (shellParameters == null || !shellParameters.checkParameters()) {
@@ -79,14 +62,14 @@ public class ShellTask extends AbstractTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
IShellInterceptorBuilder<?, ?> shellActuatorBuilder =
ShellInterceptorBuilderFactory.newBuilder()
-
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
.appendScript(shellParameters.getRawScript());
TaskResponse commandExecuteResult =
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
- taskExecutionContext.setVarPool(shellParameters.getVarPool());
+ taskRequest.setVarPool(shellParameters.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Shell task has been interrupted", e);