This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 26fe6fe53c [Fix-17780][TaskPlugin] Fix shell output log might loss due 
to incorrect usage of log buffer (#17790)
26fe6fe53c is described below

commit 26fe6fe53cd67aacbb0744bf044b2699630f5e16
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Dec 22 12:26:03 2025 +0800

    [Fix-17780][TaskPlugin] Fix shell output log might loss due to incorrect 
usage of log buffer (#17790)
---
 docs/docs/en/guide/upgrade/incompatible.md         |   1 +
 .../plugin/task/api/AbstractCommandExecutor.java   | 121 +++++----------------
 .../plugin/task/api/AbstractTask.java              |  16 ---
 .../plugin/task/api/AbstractYarnTask.java          |   4 +-
 .../plugin/task/api/ShellCommandExecutor.java      |   8 +-
 .../plugin/task/chunjun/ChunJunTask.java           |  28 +----
 .../plugin/task/datax/DataxTask.java               |  31 +-----
 .../dolphinscheduler/plugin/task/dvc/DvcTask.java  |  26 +----
 .../plugin/task/hivecli/HiveCliTask.java           |  18 ++-
 .../plugin/task/java/JavaTask.java                 |  15 +--
 .../plugin/task/jupyter/JupyterTask.java           |   9 +-
 .../plugin/task/linkis/LinkisTask.java             |  26 +----
 .../plugin/task/mlflow/MlflowTask.java             |  23 +---
 .../plugin/task/python/PythonTask.java             |  12 +-
 .../plugin/task/pytorch/PytorchTask.java           |  12 +-
 .../plugin/task/seatunnel/SeatunnelTask.java       |  39 ++-----
 .../task/seatunnel/flink/SeatunnelFlinkTask.java   |   2 +-
 .../task/seatunnel/self/SeatunnelEngineTask.java   |   2 +-
 .../task/seatunnel/spark/SeatunnelSparkTask.java   |   2 +-
 .../plugin/task/shell/ShellTask.java               |  29 +----
 20 files changed, 94 insertions(+), 330 deletions(-)

diff --git a/docs/docs/en/guide/upgrade/incompatible.md 
b/docs/docs/en/guide/upgrade/incompatible.md
index e30058aef9..508ba65bc5 100644
--- a/docs/docs/en/guide/upgrade/incompatible.md
+++ b/docs/docs/en/guide/upgrade/incompatible.md
@@ -37,4 +37,5 @@ This document records the incompatible updates between each 
version. You need to
 * Renamed the publicKey field to privateKey in the SSH connection parameters 
under the datasource configuration. 
([#17666])(https://github.com/apache/dolphinscheduler/pull/17666)
 * Add table t_ds_serial_command. 
([#17531])(https://github.com/apache/dolphinscheduler/pull/17531)
 * Remove the default value of `python-gateway.auth-token` at 
`api-server/application.yaml`. 
([#17801])(https://github.com/apache/dolphinscheduler/pull/17801)
+* Refactor the task plugins which use ShellCommandExecutor 
([#17790])(https://github.com/apache/dolphinscheduler/pull/17790)
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 6e891298dd..86199c874e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.plugin.task.api;
 
-import static 
org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
 import static 
org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_HARD_KILL;
@@ -42,12 +41,10 @@ import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 import lombok.extern.slf4j.Slf4j;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
@@ -59,41 +56,12 @@ import io.fabric8.kubernetes.client.dsl.LogWatch;
 public abstract class AbstractCommandExecutor {
 
     protected volatile Map<String, String> taskOutputParams = new HashMap<>();
-    /**
-     * process
-     */
     private Process process;
 
-    /**
-     * log handler
-     */
-    protected Consumer<LinkedBlockingQueue<String>> logHandler;
-
-    /**
-     * log list
-     */
-    protected LinkedBlockingQueue<String> logBuffer;
-
-    protected boolean processLogOutputIsSuccess = false;
-
-    protected boolean podLogOutputIsFinished = false;
-
-    /**
-     * taskRequest
-     */
     protected TaskExecutionContext taskRequest;
 
-    protected Future<?> taskOutputFuture;
-
-    protected Future<?> podLogOutputFuture;
-
-    public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
-                                   TaskExecutionContext taskRequest) {
-        this.logHandler = logHandler;
+    public AbstractCommandExecutor(TaskExecutionContext taskRequest) {
         this.taskRequest = taskRequest;
-        this.logBuffer = new LinkedBlockingQueue<>();
-        this.logBuffer.add(EMPTY_STRING);
-
     }
 
     // todo: We need to build the IShellActuator in outer class, since 
different task may have specific logic to build
@@ -135,10 +103,10 @@ public abstract class AbstractCommandExecutor {
         process = iShellInterceptor.execute();
 
         // parse process output
-        parseProcessOutput(this.process);
+        final CompletableFuture<Void> collectProcessLogFuture = 
collectProcessLog(this.process);
 
         // collect pod log
-        collectPodLogIfNeeded();
+        final Optional<CompletableFuture<?>> collectPodLogFuture = 
collectPodLogIfNeeded();
 
         int processId = getProcessId(this.process);
 
@@ -164,24 +132,14 @@ public abstract class AbstractCommandExecutor {
         TaskExecutionStatus kubernetesStatus =
                 
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), 
taskRequest.getTaskAppId());
 
-        if (taskOutputFuture != null) {
-            try {
-                // Wait the task log process finished.
-                taskOutputFuture.get();
-            } catch (ExecutionException e) {
-                log.error("Handle task log error", e);
-            }
-        }
+        // Wait the task log process finished.
+        collectProcessLogFuture.join();
 
-        if (podLogOutputFuture != null) {
-            try {
-                // Wait kubernetes pod log collection finished
-                podLogOutputFuture.get();
-                // delete pod after successful execution and log collection
-                ProcessUtils.cancelApplication(taskRequest);
-            } catch (ExecutionException e) {
-                log.error("Handle pod log error", e);
-            }
+        if (collectPodLogFuture.isPresent()) {
+            // Wait kubernetes pod log collection finished
+            collectPodLogFuture.get().join();
+            // delete pod after successful execution and log collection
+            ProcessUtils.cancelApplication(taskRequest);
         }
 
         // if SHELL task exit
@@ -227,16 +185,15 @@ public abstract class AbstractCommandExecutor {
         ProcessUtils.cancelApplication(taskRequest);
     }
 
-    private void collectPodLogIfNeeded() {
+    private Optional<CompletableFuture<?>> collectPodLogIfNeeded() {
         if (null == taskRequest.getK8sTaskExecutionContext()) {
-            podLogOutputIsFinished = true;
-            return;
+            return Optional.empty();
         }
 
         ExecutorService collectPodLogExecutorService = ThreadUtils
                 
.newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + 
taskRequest.getTaskName());
 
-        podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
+        final CompletableFuture<Void> collectPodLogFuture = 
CompletableFuture.runAsync(() -> {
             // wait for launching (driver) pod
             ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L);
             try (
@@ -248,67 +205,43 @@ public abstract class AbstractCommandExecutor {
                     String line;
                     try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(watcher.getOutput()))) {
                         while ((line = reader.readLine()) != null) {
-                            logBuffer.add(String.format("[K8S-pod-log-%s]: 
%s", taskRequest.getTaskName(), line));
+                            log.info("[K8S-pod-log-{}]: {}", 
taskRequest.getTaskName(), line);
                         }
                     }
                 }
             } catch (Exception e) {
+                log.error("Collect pod log error", e);
                 throw new RuntimeException(e);
-            } finally {
-                podLogOutputIsFinished = true;
             }
-
-        });
+        }, collectPodLogExecutorService);
 
         collectPodLogExecutorService.shutdown();
+        return Optional.of(collectPodLogFuture);
     }
 
-    private void parseProcessOutput(Process process) {
+    private CompletableFuture<Void> collectProcessLog(Process process) {
         // todo: remove this this thread pool.
-        ExecutorService getOutputLogService = ThreadUtils
-                
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + 
taskRequest.getTaskName());
-        getOutputLogService.execute(() -> {
+        final ExecutorService collectProcessLogService = 
ThreadUtils.newSingleDaemonScheduledExecutorService(
+                "ResolveOutputLog-thread-" + taskRequest.getTaskName());
+        final CompletableFuture<Void> collectProcessLogFuture = 
CompletableFuture.runAsync(() -> {
             TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
             try (BufferedReader inReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()))) {
                 
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
                 String line;
                 while ((line = inReader.readLine()) != null) {
-                    logBuffer.add(line);
+                    log.info(" -> {}", line);
                     taskOutputParameterParser.appendParseLog(line);
                 }
-                processLogOutputIsSuccess = true;
             } catch (Exception e) {
                 log.error("Parse var pool error", e);
-                processLogOutputIsSuccess = true;
             } finally {
                 LogUtils.removeTaskInstanceLogFullPathMDC();
             }
             taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
-        });
-
-        getOutputLogService.shutdown();
+        }, collectProcessLogService);
 
-        ExecutorService parseProcessOutputExecutorService = ThreadUtils
-                
.newSingleDaemonScheduledExecutorService("TaskInstanceLogOutput-thread-" + 
taskRequest.getTaskName());
-        taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
-            try {
-                
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
-                while (logBuffer.size() > 1 || !processLogOutputIsSuccess || 
!podLogOutputIsFinished) {
-                    if (logBuffer.size() > 1) {
-                        logHandler.accept(logBuffer);
-                        logBuffer.clear();
-                        logBuffer.add(EMPTY_STRING);
-                    } else {
-                        Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Output task log error", e);
-            } finally {
-                LogUtils.removeTaskInstanceLogFullPathMDC();
-            }
-        });
-        parseProcessOutputExecutorService.shutdown();
+        collectProcessLogService.shutdown();
+        return collectProcessLogFuture;
     }
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
index afb0752df7..006f40aa14 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
@@ -23,8 +23,6 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 
 import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 
 import lombok.Getter;
@@ -149,20 +147,6 @@ public abstract class AbstractTask {
         return TaskExecutionStatus.FAILURE;
     }
 
-    /**
-     * log handle
-     *
-     * @param logs log list
-     */
-    public void logHandle(LinkedBlockingQueue<String> logs) {
-
-        StringJoiner joiner = new StringJoiner("\n\t");
-        while (!logs.isEmpty()) {
-            joiner.add(logs.poll());
-        }
-        log.info(" -> {}", joiner);
-    }
-
     /**
      * regular expressions match the contents between two specified strings
      *
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index d4b98afd67..d58ba941f5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -34,11 +34,11 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public abstract class AbstractYarnTask extends AbstractRemoteTask {
 
-    private ShellCommandExecutor shellCommandExecutor;
+    private final ShellCommandExecutor shellCommandExecutor;
 
     public AbstractYarnTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskRequest);
+        this.shellCommandExecutor = new ShellCommandExecutor(taskRequest);
     }
 
     // todo split handle to submit and track
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index c9834adb86..3cea54ec61 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -17,14 +17,10 @@
 
 package org.apache.dolphinscheduler.plugin.task.api;
 
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
-
 public class ShellCommandExecutor extends AbstractCommandExecutor {
 
-    public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
-                                TaskExecutionContext taskRequest) {
-        super(logHandler, taskRequest);
+    public ShellCommandExecutor(TaskExecutionContext taskRequest) {
+        super(taskRequest);
     }
 
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index e3814a076d..28c4585372 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -60,26 +60,13 @@ public class ChunJunTask extends AbstractTask {
      */
     private static final String CHUNJUN_DIST_DIR = 
"${CHUNJUN_HOME}/chunjun-dist";
 
-    /**
-     * chunJun parameters
-     */
     private ChunJunParameters chunJunParameters;
 
-    /**
-     * shell command executor
-     */
-    private ShellCommandExecutor shellCommandExecutor;
-
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
+    private final ShellCommandExecutor shellCommandExecutor;
 
     public ChunJunTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-        this.taskExecutionContext = taskExecutionContext;
-
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     /**
@@ -87,9 +74,8 @@ public class ChunJunTask extends AbstractTask {
      */
     @Override
     public void init() {
-        chunJunParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
ChunJunParameters.class);
-        log.info("Initialize chunjun task params {}",
-                
JSONUtils.toPrettyJsonString(taskExecutionContext.getTaskParams()));
+        chunJunParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
ChunJunParameters.class);
+        log.info("Initialize chunjun task params {}", 
JSONUtils.toPrettyJsonString(taskRequest.getTaskParams()));
 
         if (!chunJunParameters.checkParameters()) {
             throw new RuntimeException("chunjun task params is not valid");
@@ -100,7 +86,7 @@ public class ChunJunTask extends AbstractTask {
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
-            Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
+            Map<String, Property> paramsMap = 
taskRequest.getPrepareParamsMap();
 
             IShellInterceptorBuilder<?, ?> shellActuatorBuilder = 
ShellInterceptorBuilderFactory.newBuilder()
                     .properties(ParameterUtils.convert(paramsMap))
@@ -133,9 +119,7 @@ public class ChunJunTask extends AbstractTask {
      */
     private String buildChunJunJsonFile(Map<String, Property> paramsMap) 
throws Exception {
         // generate json
-        String fileName = String.format("%s/%s_job.json",
-                taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTaskAppId());
+        String fileName = String.format("%s/%s_job.json", 
taskRequest.getExecutePath(), taskRequest.getTaskAppId());
 
         String json = null;
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index d8aadffe96..fd173eb4ef 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -103,33 +103,16 @@ public class DataxTask extends AbstractTask {
      */
     private static final int DATAX_CHANNEL_COUNT = 1;
 
-    /**
-     * datax parameters
-     */
     private DataxParameters dataXParameters;
 
-    /**
-     * shell command executor
-     */
-    private ShellCommandExecutor shellCommandExecutor;
-
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
+    private final ShellCommandExecutor shellCommandExecutor;
 
     private DataxTaskExecutionContext dataxTaskExecutionContext;
 
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
     public DataxTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-        this.taskExecutionContext = taskExecutionContext;
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     /**
@@ -137,7 +120,7 @@ public class DataxTask extends AbstractTask {
      */
     @Override
     public void init() {
-        dataXParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
DataxParameters.class);
+        dataXParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
DataxParameters.class);
         log.info("Initialize datax task params {}", 
JSONUtils.toPrettyJsonString(dataXParameters));
 
         if (dataXParameters == null || !dataXParameters.checkParameters()) {
@@ -145,7 +128,7 @@ public class DataxTask extends AbstractTask {
         }
         SensitiveDataConverter.addMaskPattern(POST_JDBC_INFO_REGEX);
         dataxTaskExecutionContext =
-                
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+                
dataXParameters.generateExtendedContext(taskRequest.getResourceParametersHelper());
     }
 
     @SuppressWarnings("unchecked")
@@ -153,7 +136,7 @@ public class DataxTask extends AbstractTask {
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             // replace placeholder,and combine local and global parameters
-            Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
+            Map<String, Property> paramsMap = 
taskRequest.getPrepareParamsMap();
 
             IShellInterceptorBuilder<?, ?> shellActuatorBuilder = 
ShellInterceptorBuilderFactory.newBuilder()
                     .properties(ParameterUtils.convert(paramsMap))
@@ -198,9 +181,7 @@ public class DataxTask extends AbstractTask {
      */
     private String buildDataxJsonFile(Map<String, Property> paramsMap) throws 
Exception {
         // generate json
-        String fileName = String.format("%s/%s_job.json",
-                taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTaskAppId());
+        String fileName = String.format("%s/%s_job.json", 
taskRequest.getExecutePath(), taskRequest.getTaskAppId());
         String json;
 
         Path path = new File(fileName).toPath();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index d1a8bf8b1b..1086563606 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -38,37 +38,19 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class DvcTask extends AbstractTask {
 
-    /**
-     * dvc parameters
-     */
     private DvcParameters parameters;
 
-    /**
-     * shell command executor
-     */
-    private ShellCommandExecutor shellCommandExecutor;
-
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
-
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
+    private final ShellCommandExecutor shellCommandExecutor;
+
     public DvcTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-
-        this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     @Override
     public void init() {
 
-        parameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
DvcParameters.class);
+        parameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
DvcParameters.class);
         log.info("Initialize dvc task params {}", 
JSONUtils.toPrettyJsonString(parameters));
 
         if (parameters == null || !parameters.checkParameters()) {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 6630bfb8b7..94e562cff1 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -63,13 +63,9 @@ public class HiveCliTask extends AbstractRemoteTask {
 
     private final ShellCommandExecutor shellCommandExecutor;
 
-    private final TaskExecutionContext taskExecutionContext;
-
     public HiveCliTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-        this.taskExecutionContext = taskExecutionContext;
-
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     @Override
@@ -79,9 +75,9 @@ public class HiveCliTask extends AbstractRemoteTask {
 
     @Override
     public void init() {
-        log.info("hiveCli task params {}", 
taskExecutionContext.getTaskParams());
+        log.info("hiveCli task params {}", taskRequest.getTaskParams());
 
-        hiveCliParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
HiveCliParameters.class);
+        hiveCliParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
HiveCliParameters.class);
 
         if (!hiveCliParameters.checkParameters()) {
             throw new TaskException("hiveCli task params is not valid");
@@ -138,7 +134,7 @@ public class HiveCliTask extends AbstractRemoteTask {
 
             try {
                 resourceFileName = resourceInfos.get(0).getResourceName();
-                ResourceContext resourceContext = 
taskExecutionContext.getResourceContext();
+                ResourceContext resourceContext = 
taskRequest.getResourceContext();
                 sqlContent = FileUtils.readFileToString(
                         new 
File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
                         StandardCharsets.UTF_8);
@@ -150,7 +146,7 @@ public class HiveCliTask extends AbstractRemoteTask {
             sqlContent = hiveCliParameters.getHiveSqlScript();
         }
 
-        final Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
+        final Map<String, Property> paramsMap = 
taskRequest.getPrepareParamsMap();
         sqlContent = ParameterUtils.convertParameterPlaceholders(sqlContent, 
ParameterUtils.convert(paramsMap));
         log.info("HiveCli sql content: {}", sqlContent);
         String sqlFilePath = generateSqlScriptFile(sqlContent);
@@ -184,8 +180,8 @@ public class HiveCliTask extends AbstractRemoteTask {
     }
 
     protected String generateSqlScriptFile(String rawScript) {
-        String scriptFileName = String.format("%s/%s_node.sql", 
taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTaskAppId());
+        String scriptFileName = String.format("%s/%s_node.sql", 
taskRequest.getExecutePath(),
+                taskRequest.getTaskAppId());
 
         File file = new File(scriptFileName);
         Path path = file.toPath();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index ab32c87dd5..5e958026ca 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -46,25 +46,14 @@ import com.google.common.base.Preconditions;
 @Slf4j
 public class JavaTask extends AbstractTask {
 
-    /**
-     * Contains various parameters for this task
-     */
     private JavaParameters javaParameters;
 
-    /**
-     * To run shell commands
-     */
-    private ShellCommandExecutor shellCommandExecutor;
-
-    /**
-     * task execution context
-     */
-    private TaskExecutionContext taskRequest;
+    private final ShellCommandExecutor shellCommandExecutor;
 
     public JavaTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
         this.taskRequest = taskRequest;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskRequest);
+        this.shellCommandExecutor = new ShellCommandExecutor(taskRequest);
     }
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 5f79b139a9..f55da63f32 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -50,14 +50,11 @@ public class JupyterTask extends AbstractRemoteTask {
 
     private JupyterParameters jupyterParameters;
 
-    private TaskExecutionContext taskExecutionContext;
-
     private ShellCommandExecutor shellCommandExecutor;
 
     public JupyterTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-        this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     @Override
@@ -68,7 +65,7 @@ public class JupyterTask extends AbstractRemoteTask {
     @Override
     public void init() {
 
-        jupyterParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
JupyterParameters.class);
+        jupyterParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
JupyterParameters.class);
         log.info("Initialize jupyter task params {}", 
JSONUtils.toPrettyJsonString(jupyterParameters));
 
         if (null == jupyterParameters) {
@@ -86,7 +83,7 @@ public class JupyterTask extends AbstractRemoteTask {
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             IShellInterceptorBuilder<?, ?> shellActuatorBuilder = 
ShellInterceptorBuilderFactory.newBuilder()
-                    
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+                    
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
                     .appendScript(buildCommand());
 
             TaskResponse response = 
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
index a4ab0b96a9..8f4a346abd 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
@@ -46,20 +46,9 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class LinkisTask extends AbstractRemoteTask {
 
-    /**
-     * linkis parameters
-     */
     private LinkisParameters linkisParameters;
 
-    /**
-     * shell command executor
-     */
-    private ShellCommandExecutor shellCommandExecutor;
-
-    /**
-     * taskExecutionContext
-     */
-    protected final TaskExecutionContext taskExecutionContext;
+    private final ShellCommandExecutor shellCommandExecutor;
 
     private String taskId;
 
@@ -67,16 +56,9 @@ public class LinkisTask extends AbstractRemoteTask {
 
     protected static final Pattern LINKIS_STATUS_REGEX = 
Pattern.compile(Constants.LINKIS_STATUS_REGEX);
 
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
     public LinkisTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-
-        this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     @Override
@@ -86,7 +68,7 @@ public class LinkisTask extends AbstractRemoteTask {
 
     @Override
     public void init() {
-        linkisParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
LinkisParameters.class);
+        linkisParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
LinkisParameters.class);
         log.info("Initialize Linkis task params {}", 
JSONUtils.toPrettyJsonString(linkisParameters));
 
         if (!linkisParameters.checkParameters()) {
@@ -99,7 +81,7 @@ public class LinkisTask extends AbstractRemoteTask {
         try {
             // construct process
             IShellInterceptorBuilder<?, ?> shellActuatorBuilder = 
ShellInterceptorBuilderFactory.newBuilder()
-                    
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+                    
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
                     .appendScript(buildCommand());
             TaskResponse commandExecuteResult = 
shellCommandExecutor.run(shellActuatorBuilder, null);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index bc97be04e6..332879d6ce 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -48,30 +48,13 @@ import lombok.extern.slf4j.Slf4j;
 public class MlflowTask extends AbstractTask {
 
     private static final Pattern GIT_CHECK_PATTERN = 
Pattern.compile("^(git@|https?://)");
-    /**
-     * shell command executor
-     */
     private final ShellCommandExecutor shellCommandExecutor;
 
-    /**
-     * taskExecutionContext
-     */
-    private final TaskExecutionContext taskExecutionContext;
-    /**
-     * shell parameters
-     */
     private MlflowParameters mlflowParameters;
 
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
     public MlflowTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-
-        this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     static public String getPresetRepository() {
@@ -105,7 +88,7 @@ public class MlflowTask extends AbstractTask {
     @Override
     public void init() {
 
-        mlflowParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
MlflowParameters.class);
+        mlflowParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
MlflowParameters.class);
 
         log.info("Initialize MLFlow task params {}", 
JSONUtils.toPrettyJsonString(mlflowParameters));
         if (mlflowParameters == null || !mlflowParameters.checkParameters()) {
@@ -254,7 +237,7 @@ public class MlflowTask extends AbstractTask {
     }
 
     private Map<String, Property> getParamsMap() {
-        return taskExecutionContext.getPrepareParamsMap();
+        return taskRequest.getPrepareParamsMap();
 
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 715438f6e6..dc4381adf0 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -49,22 +49,14 @@ public class PythonTask extends AbstractTask {
 
     protected PythonParameters pythonParameters;
 
-    private ShellCommandExecutor shellCommandExecutor;
-
-    protected TaskExecutionContext taskRequest;
+    private final ShellCommandExecutor shellCommandExecutor;
 
     protected static final String PYTHON_LAUNCHER = "PYTHON_LAUNCHER";
 
-    /**
-     * constructor
-     *
-     * @param taskRequest taskRequest
-     */
     public PythonTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
-        this.taskRequest = taskRequest;
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskRequest);
+        this.shellCommandExecutor = new ShellCommandExecutor(taskRequest);
     }
 
     @Override
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index 1d7a874dfc..14422ba4da 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -41,21 +41,19 @@ public class PytorchTask extends AbstractTask {
 
     private final ShellCommandExecutor shellCommandExecutor;
     protected PytorchParameters pytorchParameters;
-    protected TaskExecutionContext taskExecutionContext;
     private PythonEnvManager pythonEnvManager;
 
     public PytorchTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-        this.taskExecutionContext = taskExecutionContext;
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     @Override
     public void init() {
 
-        pytorchParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
PytorchParameters.class);
-        log.info("Initialize pytorch task params {}", 
JSONUtils.toPrettyJsonString(taskExecutionContext));
+        pytorchParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
PytorchParameters.class);
+        log.info("Initialize pytorch task params {}", 
JSONUtils.toPrettyJsonString(taskRequest));
 
         if (pytorchParameters == null || !pytorchParameters.checkParameters()) 
{
             throw new TaskException("python task params is not valid");
@@ -71,7 +69,7 @@ public class PytorchTask extends AbstractTask {
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             IShellInterceptorBuilder<?, ?> shellActuatorBuilder = 
ShellInterceptorBuilderFactory.newBuilder()
-                    
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+                    
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
                     .appendScript(buildPythonExecuteCommand());
 
             TaskResponse taskResponse = 
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
@@ -102,7 +100,7 @@ public class PytorchTask extends AbstractTask {
         if (GitProjectManager.isGitPath(pythonPath)) {
             GitProjectManager gpm = new GitProjectManager();
             gpm.setPath(pythonPath);
-            gpm.setBaseDir(taskExecutionContext.getExecutePath());
+            gpm.setBaseDir(taskRequest.getExecutePath());
             gpm.prepareProject();
             pytorchParameters.setPythonPath(gpm.getGitLocalPath());
         }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index da9efbfec0..c0b37057ea 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -57,31 +57,14 @@ public class SeatunnelTask extends AbstractRemoteTask {
 
     private static final String SEATUNNEL_BIN_DIR = "${SEATUNNEL_HOME}/bin/";
 
-    /**
-     * seatunnel parameters
-     */
     private SeatunnelParameters seatunnelParameters;
 
-    /**
-     * shell command executor
-     */
-    private ShellCommandExecutor shellCommandExecutor;
-
-    /**
-     * taskExecutionContext
-     */
-    protected final TaskExecutionContext taskExecutionContext;
-
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
+    private final ShellCommandExecutor shellCommandExecutor;
+
     public SeatunnelTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
 
-        this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     @Override
@@ -163,7 +146,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
             scriptContent = buildCustomConfigContent();
         } else {
             String resourceFileName = 
seatunnelParameters.getResourceList().get(0).getResourceName();
-            ResourceContext resourceContext = 
taskExecutionContext.getResourceContext();
+            ResourceContext resourceContext = taskRequest.getResourceContext();
             scriptContent = FileUtils.readFileToString(
                     new 
File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
                     StandardCharsets.UTF_8);
@@ -177,8 +160,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
 
     private List<String> generateTaskParameters() {
         Map<String, String> variables = new HashMap<>();
-        Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
-        List<Property> propertyList = 
JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
+        Map<String, Property> paramsMap = taskRequest.getPrepareParamsMap();
+        List<Property> propertyList = 
JSONUtils.toList(taskRequest.getGlobalParams(), Property.class);
         if (propertyList != null && !propertyList.isEmpty()) {
             for (Property property : propertyList) {
                 variables.put(property.getProp(), 
paramsMap.get(property.getProp()).getValue());
@@ -208,8 +191,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
     }
 
     private String buildConfigFilePath() {
-        return String.format("%s/seatunnel_%s.%s", 
taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTaskAppId(), formatDetector());
+        return String.format("%s/seatunnel_%s.%s", 
taskRequest.getExecutePath(),
+                taskRequest.getTaskAppId(), formatDetector());
     }
 
     private String formatDetector() {
@@ -218,8 +201,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
     }
 
     private void createConfigFileIfNotExists(String script, String scriptFile) 
throws IOException {
-        log.info("tenantCode :{}, task dir:{}", 
taskExecutionContext.getTenantCode(),
-                taskExecutionContext.getExecutePath());
+        log.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(),
+                taskRequest.getExecutePath());
 
         if (!Files.exists(Paths.get(scriptFile))) {
             log.info("generate script file:{}", scriptFile);
@@ -235,7 +218,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
     }
 
     private String parseScript(String script) {
-        Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
+        Map<String, Property> paramsMap = taskRequest.getPrepareParamsMap();
         return ParameterUtils.convertParameterPlaceholders(script, 
ParameterUtils.convert(paramsMap));
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
index 7918c6d183..492b592f70 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
@@ -36,7 +36,7 @@ public class SeatunnelFlinkTask extends SeatunnelTask {
     @Override
     public void init() {
         seatunnelParameters =
-                JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
SeatunnelFlinkParameters.class);
+                JSONUtils.parseObject(taskRequest.getTaskParams(), 
SeatunnelFlinkParameters.class);
         setSeatunnelParameters(seatunnelParameters);
         super.init();
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java
index 1507653da9..f870dfdcd9 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java
@@ -37,7 +37,7 @@ public class SeatunnelEngineTask extends SeatunnelTask {
     @Override
     public void init() {
         seatunnelParameters =
-                JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
SeatunnelEngineParameters.class);
+                JSONUtils.parseObject(taskRequest.getTaskParams(), 
SeatunnelEngineParameters.class);
         setSeatunnelParameters(seatunnelParameters);
         super.init();
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
index d8b1a69156..cb1e6aeb3e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
@@ -43,7 +43,7 @@ public class SeatunnelSparkTask extends SeatunnelTask {
     @Override
     public void init() {
         seatunnelParameters =
-                JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
SeatunnelSparkParameters.class);
+                JSONUtils.parseObject(taskRequest.getTaskParams(), 
SeatunnelSparkParameters.class);
         setSeatunnelParameters(seatunnelParameters);
         super.init();
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 0d90ab998b..ccb8efb492 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -36,37 +36,20 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class ShellTask extends AbstractTask {
 
-    /**
-     * shell parameters
-     */
     private ShellParameters shellParameters;
 
-    /**
-     * shell command executor
-     */
-    private ShellCommandExecutor shellCommandExecutor;
-
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
-
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
+    private final ShellCommandExecutor shellCommandExecutor;
+
     public ShellTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
 
-        this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
+        this.shellCommandExecutor = new 
ShellCommandExecutor(taskExecutionContext);
     }
 
     @Override
     public void init() {
 
-        shellParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
ShellParameters.class);
+        shellParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), 
ShellParameters.class);
         log.info("Initialize shell task params {}", 
JSONUtils.toPrettyJsonString(shellParameters));
 
         if (shellParameters == null || !shellParameters.checkParameters()) {
@@ -79,14 +62,14 @@ public class ShellTask extends AbstractTask {
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             IShellInterceptorBuilder<?, ?> shellActuatorBuilder = 
ShellInterceptorBuilderFactory.newBuilder()
-                    
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+                    
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
                     .appendScript(shellParameters.getRawScript());
 
             TaskResponse commandExecuteResult = 
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
             
shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
-            taskExecutionContext.setVarPool(shellParameters.getVarPool());
+            taskRequest.setVarPool(shellParameters.getVarPool());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("The current Shell task has been interrupted", e);

Reply via email to