This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 71ee1f0c3d Support parse task output params under multiple log (#15244)
71ee1f0c3d is described below

commit 71ee1f0c3d2774e4100dc78829f5139bf6e0e71c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Dec 1 10:16:36 2023 +0800

    Support parse task output params under multiple log (#15244)
---
 docs/docs/en/guide/parameter/context.md            |   2 +-
 docs/docs/zh/guide/parameter/context.md            |   2 +-
 .../plugin/task/api/AbstractCommandExecutor.java   |  80 +++++----------
 .../plugin/task/api/AbstractTask.java              |  26 ++---
 .../plugin/task/api/AbstractYarnTask.java          |   7 +-
 .../plugin/task/api/ShellCommandExecutor.java      |  10 +-
 .../plugin/task/api/TaskConstants.java             |   4 -
 .../plugin/task/api/k8s/AbstractK8sTask.java       |  11 ++-
 .../task/api/k8s/AbstractK8sTaskExecutor.java      |  16 +--
 .../plugin/task/api/k8s/impl/K8sTaskExecutor.java  |  18 ++--
 .../plugin/task/api/loop/BaseLoopTaskExecutor.java |   2 +
 .../task/api/parameters/AbstractParameters.java    |  38 ++------
 .../plugin/task/api/parameters/SqlParameters.java  |   1 -
 .../task/api/parser/TaskOutputParameterParser.java | 107 +++++++++++++++++++++
 .../plugin/task/api/utils/VarPoolUtils.java        |  47 ---------
 .../plugin/task/api/AbstractTaskTest.java          |  20 ----
 .../plugin/task/api/k8s/K8sTaskExecutorTest.java   |   9 +-
 .../api/parser/TaskOutputParameterParserTest.java  |  77 +++++++++++++++
 .../plugin/task/api/utils/VarPoolUtilsTest.java    |  50 ----------
 .../test/resources/outputParam/emptyVarPoolLog.txt |  16 +++
 .../test/resources/outputParam/multipleVarPoll.txt |  19 ++++
 .../outputParam/oneVarPollInMultiLineLog.txt       |  21 ++++
 .../resources/outputParam/onelineVarPoolLog.txt    |  18 ++++
 .../plugin/task/chunjun/ChunJunTask.java           |   9 +-
 .../plugin/task/datafactory/DatafactoryTask.java   |   2 +
 .../plugin/task/dq/DataQualityTask.java            |   3 +
 .../plugin/task/datasync/DatasyncTask.java         |   2 +
 .../plugin/task/datax/DataxTask.java               |   6 +-
 .../plugin/task/dinky/DinkyTask.java               |  14 +--
 .../dolphinscheduler/plugin/task/dms/DmsTask.java  |   3 +
 .../dolphinscheduler/plugin/task/dvc/DvcTask.java  |  10 +-
 .../plugin/task/emr/AbstractEmrTask.java           |   3 +
 .../plugin/task/emr/EmrAddStepsTask.java           |   3 +
 .../plugin/task/emr/EmrJobFlowTask.java            |   3 +
 .../plugin/task/flink/FlinkStreamTask.java         |  11 +--
 .../plugin/task/flink/FlinkTask.java               |   3 +
 .../plugin/task/hivecli/HiveCliTask.java           |   9 +-
 .../plugin/task/http/HttpTask.java                 |   3 +
 .../plugin/task/java/JavaTask.java                 |   9 +-
 .../plugin/task/jupyter/JupyterTask.java           |   7 +-
 .../dolphinscheduler/plugin/task/k8s/K8sTask.java  |  15 +--
 .../plugin/task/k8s/K8sTaskTest.java               |   4 +-
 .../plugin/kubeflow/KubeflowTask.java              |   3 +
 .../plugin/task/linkis/LinkisTask.java             |  12 +--
 .../plugin/task/mlflow/MlflowTask.java             |  10 +-
 .../plugin/task/mr/MapReduceTask.java              |  18 +---
 .../plugin/task/openmldb/OpenmldbTask.java         |   6 +-
 .../plugin/task/pigeon/PigeonTask.java             |   3 +
 .../plugin/task/procedure/ProcedureTask.java       |  18 ++--
 .../plugin/task/python/PythonTask.java             |  20 ++--
 .../plugin/task/pytorch/PytorchTask.java           |   9 +-
 .../plugin/task/remoteshell/RemoteExecutor.java    |  73 +++++---------
 .../plugin/task/remoteshell/RemoteShellTask.java   |   8 +-
 .../plugin/task/sagemaker/SagemakerTask.java       |   3 +
 .../plugin/task/seatunnel/SeatunnelTask.java       |  12 +--
 .../plugin/task/shell/ShellTask.java               |  12 +--
 .../plugin/task/spark/SparkTask.java               |  10 +-
 .../dolphinscheduler/plugin/task/sql/SqlTask.java  |  16 +--
 .../plugin/task/sqoop/SqoopTask.java               |   9 +-
 .../plugin/task/zeppelin/ZeppelinTask.java         |  13 +--
 60 files changed, 495 insertions(+), 480 deletions(-)

diff --git a/docs/docs/en/guide/parameter/context.md 
b/docs/docs/en/guide/parameter/context.md
index 15cf0f4aed..9d6131d92e 100644
--- a/docs/docs/en/guide/parameter/context.md
+++ b/docs/docs/en/guide/parameter/context.md
@@ -122,7 +122,7 @@ Although the two parameters var1 and var2 are output in the 
A task, only the `OU
 
 #### Pass parameter from Kubernetes task to downstream
 
-Different programming languages may use different logging frameworks in 
Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler 
provides a universal logging data format `${(key=value)dsVal}`. Users can 
output log data in the format `${(key=value)dsVal}` in the terminal logs of 
their applications, where `key` is the corresponding parameter prop and `value` 
is the value of that parameter. DolphinScheduler will capture the 
`${(key=value)dsVal}` in the output logs to captu [...]
+Different programming languages may use different logging frameworks in 
Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler 
provides a universal logging data format `${(key=value)}` or `#{(key=value)}`. 
Users can output log data in the format in the terminal logs of their 
applications, where `key` is the corresponding parameter prop and `value` is 
the value of that parameter. DolphinScheduler will capture the `${(key=value)}` 
or `#{(key=value)}` in the output logs  [...]
 
 For example
 
diff --git a/docs/docs/zh/guide/parameter/context.md 
b/docs/docs/zh/guide/parameter/context.md
index f02f774142..7f5870458d 100644
--- a/docs/docs/zh/guide/parameter/context.md
+++ b/docs/docs/zh/guide/parameter/context.md
@@ -121,7 +121,7 @@ Node_mysql 运行结果如下:
 
 #### Kubernetes 任务传递参数
 
-在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即
 `${(key=value)dsVal}`,用户可以在应用程序的终端日志中输出以格式为 `${(key=value)dsVal}` 结束的日志数据,key 
为对应参数的 prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 
`${(key=value)dsVal}`来进行参数捕捉,从而传递到下游。
+在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即
 `${(key=value)}` 或 `#{(key=value)}`,用户可以在应用程序的终端日志中输出以这种格式的日志数据,key 为对应参数的 
prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)}` 和 `#{(key=value)}` 
来进行参数捕捉,从而传递到下游。
 
 如下图所示:
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index eef4c98e69..e54df42944 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import 
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
 import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor;
 import 
org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -39,31 +40,25 @@ import org.apache.commons.lang3.StringUtils;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.lang.reflect.Field;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
 
+import lombok.extern.slf4j.Slf4j;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
 
 /**
  * abstract command executor
  */
+@Slf4j
 public abstract class AbstractCommandExecutor {
 
-    /**
-     * rules for extracting Var Pool
-     */
-    protected static final Pattern SETVALUE_REGEX = 
Pattern.compile(TaskConstants.SETVALUE_REGEX);
-
-    protected StringBuilder varPool = new StringBuilder();
+    protected volatile Map<String, String> taskOutputParams = new HashMap<>();
     /**
      * process
      */
@@ -74,11 +69,6 @@ public abstract class AbstractCommandExecutor {
      */
     protected Consumer<LinkedBlockingQueue<String>> logHandler;
 
-    /**
-     * logger
-     */
-    protected Logger logger;
-
     /**
      * log list
      */
@@ -98,11 +88,9 @@ public abstract class AbstractCommandExecutor {
     protected Future<?> podLogOutputFuture;
 
     public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
-                                   TaskExecutionContext taskRequest,
-                                   Logger logger) {
+                                   TaskExecutionContext taskRequest) {
         this.logHandler = logHandler;
         this.taskRequest = taskRequest;
-        this.logger = logger;
         this.logBuffer = new LinkedBlockingQueue<>();
         this.logBuffer.add(EMPTY_STRING);
 
@@ -119,7 +107,7 @@ public abstract class AbstractCommandExecutor {
         TaskResponse result = new TaskResponse();
         int taskInstanceId = taskRequest.getTaskInstanceId();
         if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
-            logger.warn(
+            log.warn(
                     "Cannot find the taskInstance: {} from 
TaskExecutionContextCacheManager, the task might already been killed",
                     taskInstanceId);
             result.setExitStatusCode(EXIT_CODE_KILL);
@@ -180,7 +168,7 @@ public abstract class AbstractCommandExecutor {
             return result;
         }
         // print process id
-        logger.info("process start, process id is: {}", processId);
+        log.info("process start, process id is: {}", processId);
 
         // if timeout occurs, exit directly
         long remainTime = getRemainTime();
@@ -201,7 +189,7 @@ public abstract class AbstractCommandExecutor {
                 // Wait the task log process finished.
                 taskOutputFuture.get();
             } catch (ExecutionException e) {
-                logger.error("Handle task log error", e);
+                log.error("Handle task log error", e);
             }
         }
 
@@ -212,7 +200,7 @@ public abstract class AbstractCommandExecutor {
                 // delete pod after successful execution and log collection
                 ProcessUtils.cancelApplication(taskRequest);
             } catch (ExecutionException e) {
-                logger.error("Handle pod log error", e);
+                log.error("Handle pod log error", e);
             }
         }
 
@@ -223,21 +211,21 @@ public abstract class AbstractCommandExecutor {
             result.setExitStatusCode(this.process.exitValue());
 
         } else {
-            logger.error("process has failure, the task timeout configuration 
value is:{}, ready to kill ...",
+            log.error("process has failure, the task timeout configuration 
value is:{}, ready to kill ...",
                     taskRequest.getTaskTimeout());
             result.setExitStatusCode(EXIT_CODE_FAILURE);
             cancelApplication();
         }
         int exitCode = this.process.exitValue();
         String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has 
killed." : "process has exited.";
-        logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} 
,processWaitForStatus:{} ,processExitValue:{}",
+        log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} 
,processWaitForStatus:{} ,processExitValue:{}",
                 exitLogMessage, taskRequest.getExecutePath(), processId, 
result.getExitStatusCode(), status, exitCode);
         return result;
 
     }
 
-    public String getVarPool() {
-        return varPool.toString();
+    public Map<String, String> getTaskOutputParams() {
+        return taskOutputParams;
     }
 
     public void cancelApplication() throws InterruptedException {
@@ -246,16 +234,12 @@ public abstract class AbstractCommandExecutor {
         }
 
         // soft kill
-        logger.info("Begin to kill process process, pid is : {}", 
taskRequest.getProcessId());
+        log.info("Begin to kill process process, pid is : {}", 
taskRequest.getProcessId());
         process.destroy();
         if (!process.waitFor(5, TimeUnit.SECONDS)) {
             process.destroyForcibly();
         }
-        logger.info("Success kill task: {}, pid: {}", 
taskRequest.getTaskAppId(), taskRequest.getProcessId());
-    }
-
-    private void printCommand(List<String> commands) {
-        logger.info("task run command: {}", String.join(" ", commands));
+        log.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), 
taskRequest.getProcessId());
     }
 
     private void collectPodLogIfNeeded() {
@@ -299,24 +283,22 @@ public abstract class AbstractCommandExecutor {
         ExecutorService getOutputLogService = ThreadUtils
                 
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + 
taskRequest.getTaskName());
         getOutputLogService.submit(() -> {
+            TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
             try (BufferedReader inReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()))) {
                 
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
                 String line;
                 while ((line = inReader.readLine()) != null) {
-                    if (line.startsWith("${setValue(") || 
line.startsWith("#{setValue(")) {
-                        varPool.append(findVarPool(line));
-                        varPool.append("$VarPool$");
-                    } else {
-                        logBuffer.add(line);
-                    }
+                    logBuffer.add(line);
+                    taskOutputParameterParser.appendParseLog(line);
                 }
                 processLogOutputIsSuccess = true;
             } catch (Exception e) {
-                logger.error("Parse var pool error", e);
+                log.error("Parse var pool error", e);
                 processLogOutputIsSuccess = true;
             } finally {
                 LogUtils.removeTaskInstanceLogFullPathMDC();
             }
+            taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
         });
 
         getOutputLogService.shutdown();
@@ -336,7 +318,7 @@ public abstract class AbstractCommandExecutor {
                     }
                 }
             } catch (Exception e) {
-                logger.error("Output task log error", e);
+                log.error("Output task log error", e);
             } finally {
                 LogUtils.removeTaskInstanceLogFullPathMDC();
             }
@@ -344,20 +326,6 @@ public abstract class AbstractCommandExecutor {
         parseProcessOutputExecutorService.shutdown();
     }
 
-    /**
-     * find var pool
-     *
-     * @param line
-     * @return
-     */
-    private String findVarPool(String line) {
-        Matcher matcher = SETVALUE_REGEX.matcher(line);
-        if (matcher.find()) {
-            return matcher.group(1);
-        }
-        return null;
-    }
-
     /**
      * get remain time(s)
      *
@@ -389,7 +357,7 @@ public abstract class AbstractCommandExecutor {
 
             processId = f.getInt(process);
         } catch (Exception e) {
-            logger.error("Get task pid failed", e);
+            log.error("Get task pid failed", e);
         }
 
         return processId;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
index 95ae22f272..4437df763b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
@@ -28,24 +28,20 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 
-/**
- * executive task
- */
+@Slf4j
 public abstract class AbstractTask {
 
-    protected final Logger log = LoggerFactory.getLogger(AbstractTask.class);
-
     private static String groupName1 = "paramName1";
     private static String groupName2 = "paramName2";
     public String rgex = 
String.format("['\"]\\$\\{(?<%s>.*?)}['\"]|\\$\\{(?<%s>.*?)}", groupName1, 
groupName2);
 
-    /**
-     * varPool string
-     */
-    protected String varPool;
+    @Getter
+    @Setter
+    protected Map<String, String> taskOutputParams;
 
     /**
      * taskExecutionContext
@@ -91,14 +87,6 @@ public abstract class AbstractTask {
 
     public abstract void cancel() throws TaskException;
 
-    public void setVarPool(String varPool) {
-        this.varPool = varPool;
-    }
-
-    public String getVarPool() {
-        return varPool;
-    }
-
     /**
      * get exit status code
      *
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 69f6aceb99..716947faa2 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -29,15 +29,16 @@ import 
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import java.util.List;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public abstract class AbstractYarnTask extends AbstractRemoteTask {
 
     private ShellCommandExecutor shellCommandExecutor;
 
     public AbstractYarnTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskRequest,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskRequest);
     }
 
     // todo split handle to submit and track
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index 2dbea62287..c9834adb86 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -20,17 +20,11 @@ package org.apache.dolphinscheduler.plugin.task.api;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 
-import org.slf4j.Logger;
-
-/**
- * shell command executor
- */
 public class ShellCommandExecutor extends AbstractCommandExecutor {
 
     public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
-                                TaskExecutionContext taskRequest,
-                                Logger logger) {
-        super(logHandler, taskRequest, logger);
+                                TaskExecutionContext taskRequest) {
+        super(logHandler, taskRequest);
     }
 
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 97f2943950..83f4b3a678 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -35,10 +35,6 @@ public class TaskConstants {
 
     public static final String FLINK_APPLICATION_REGEX = "JobID \\w+";
 
-    public static final String SETVALUE_REGEX = 
"[\\$#]\\{setValue\\((.*?)\\)}";
-
-    public static final String DSVALUE_REGEX = "[\\$#]\\{\\((.*?)\\)dsVal}$";
-
     /**
      * string false
      */
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
index 1d9784ace4..f77170f482 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -24,6 +24,11 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public abstract class AbstractK8sTask extends AbstractRemoteTask {
 
     /**
@@ -37,7 +42,7 @@ public abstract class AbstractK8sTask extends 
AbstractRemoteTask {
      */
     protected AbstractK8sTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
-        this.abstractK8sTaskExecutor = new K8sTaskExecutor(log, taskRequest);
+        this.abstractK8sTaskExecutor = new K8sTaskExecutor(taskRequest);
     }
 
     // todo split handle to submit and track
@@ -47,7 +52,7 @@ public abstract class AbstractK8sTask extends 
AbstractRemoteTask {
             TaskResponse response = 
abstractK8sTaskExecutor.run(buildCommand());
             setExitStatusCode(response.getExitStatusCode());
             setAppIds(response.getAppIds());
-            dealOutParam(abstractK8sTaskExecutor.getVarPool());
+            dealOutParam(abstractK8sTaskExecutor.getTaskOutputParams());
         } catch (Exception e) {
             log.error("k8s task submit failed with error");
             exitStatusCode = -1;
@@ -86,5 +91,5 @@ public abstract class AbstractK8sTask extends 
AbstractRemoteTask {
      */
     protected abstract String buildCommand();
 
-    protected abstract void dealOutParam(String result);
+    protected abstract void dealOutParam(Map<String, String> taskOutputParams);
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
index 6ad8869d33..1313dc23a6 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
@@ -22,25 +22,25 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
 
-import org.slf4j.Logger;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.yaml.snakeyaml.Yaml;
 
 public abstract class AbstractK8sTaskExecutor {
 
-    protected Logger log;
     protected TaskExecutionContext taskRequest;
     protected K8sUtils k8sUtils;
     protected Yaml yaml;
-    protected StringBuilder varPool;
-    protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext 
taskRequest) {
-        this.log = log;
+    protected volatile Map<String, String> taskOutputParams;
+    protected AbstractK8sTaskExecutor(TaskExecutionContext taskRequest) {
         this.taskRequest = taskRequest;
         this.k8sUtils = new K8sUtils();
         this.yaml = new Yaml();
-        this.varPool = new StringBuilder();
+        this.taskOutputParams = new HashMap<>();
     }
-    public String getVarPool() {
-        return varPool.toString();
+    public Map<String, String> getTaskOutputParams() {
+        return taskOutputParams;
     }
 
     public abstract TaskResponse run(String k8sParameterStr) throws Exception;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index 66665512b6..476ed85be9 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -44,10 +44,10 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import 
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -64,8 +64,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.slf4j.Logger;
-
+import lombok.extern.slf4j.Slf4j;
 import io.fabric8.kubernetes.api.model.Affinity;
 import io.fabric8.kubernetes.api.model.AffinityBuilder;
 import io.fabric8.kubernetes.api.model.EnvVar;
@@ -84,14 +83,15 @@ import io.fabric8.kubernetes.client.dsl.LogWatch;
 /**
  * K8sTaskExecutor used to submit k8s task to K8S
  */
+@Slf4j
 public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
 
     private Job job;
     protected boolean podLogOutputIsFinished = false;
     protected Future<?> podLogOutputFuture;
 
-    public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
-        super(logger, taskRequest);
+    public K8sTaskExecutor(TaskExecutionContext taskRequest) {
+        super(taskRequest);
     }
 
     public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
@@ -255,6 +255,7 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
         String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
         String containerName = String.format("%s-%s", taskName, 
taskInstanceId);
         podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
+            TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
             try (
                     LogWatch watcher = 
ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
                             taskRequest.getTaskAppId(), containerName)) {
@@ -263,11 +264,7 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
                 try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(watcher.getOutput()))) {
                     while ((line = reader.readLine()) != null) {
                         log.info("[K8S-pod-log] {}", line);
-
-                        if (line.endsWith(VarPoolUtils.VAR_SUFFIX)) {
-                            varPool.append(VarPoolUtils.findVarPool(line));
-                            varPool.append(VarPoolUtils.VAR_DELIMITER);
-                        }
+                        taskOutputParameterParser.appendParseLog(line);
                     }
                 }
             } catch (Exception e) {
@@ -276,6 +273,7 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
                 LogUtils.removeTaskInstanceLogFullPathMDC();
                 podLogOutputIsFinished = true;
             }
+            taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
         });
 
         collectPodLogExecutorService.shutdown();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
index 7b75def685..4d52a38f73 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
@@ -30,12 +30,14 @@ import java.time.Duration;
 import javax.annotation.Nullable;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * This class is the base class for all loop task type.
  * <p>
  * The loop task type means, we will submit a task, and loop the task status 
until the task is finished.
  */
+@Slf4j
 public abstract class BaseLoopTaskExecutor extends AbstractRemoteTask {
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
index 78812fcb24..a57eececf5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
@@ -24,21 +24,21 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 
-/**
- * job params related class
- */
+@Slf4j
 public abstract class AbstractParameters implements IParameters {
 
     @Override
@@ -130,7 +130,7 @@ public abstract class AbstractParameters implements 
IParameters {
         }
     }
 
-    public void dealOutParam(String result) {
+    public void dealOutParam(Map<String, String> taskOutputParams) {
         if (CollectionUtils.isEmpty(localParams)) {
             return;
         }
@@ -138,19 +138,18 @@ public abstract class AbstractParameters implements 
IParameters {
         if (CollectionUtils.isEmpty(outProperty)) {
             return;
         }
-        if (StringUtils.isEmpty(result)) {
+        if (MapUtils.isEmpty(taskOutputParams)) {
             outProperty.forEach(this::addPropertyToValPool);
             return;
         }
-        Map<String, String> taskResult = getMapByString(result);
-        if (taskResult.size() == 0) {
-            return;
-        }
+
         for (Property info : outProperty) {
-            String propValue = taskResult.get(info.getProp());
+            String propValue = taskOutputParams.get(info.getProp());
             if (StringUtils.isNotEmpty(propValue)) {
                 info.setValue(propValue);
                 addPropertyToValPool(info);
+            } else {
+                log.warn("Cannot find the output parameter {} in the task 
output parameters", info.getProp());
             }
         }
     }
@@ -178,23 +177,6 @@ public abstract class AbstractParameters implements 
IParameters {
         return allParams;
     }
 
-    /**
-     * shell's result format is key=value$VarPool$key=value$VarPool$
-     * @param result
-     * @return
-     */
-    public static Map<String, String> getMapByString(String result) {
-        String[] formatResult = result.split("\\$VarPool\\$");
-        Map<String, String> format = new HashMap<>();
-        for (String info : formatResult) {
-            if (StringUtils.isNotEmpty(info) && info.contains("=")) {
-                String[] keyValue = info.split("=");
-                format.put(keyValue[0], keyValue[1]);
-            }
-        }
-        return format;
-    }
-
     public ResourceParametersHelper getResources() {
         return new ResourceParametersHelper();
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
index bff2fe20d7..0f1a893a30 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
@@ -236,7 +236,6 @@ public class SqlParameters extends AbstractParameters {
         return new ArrayList<>();
     }
 
-    @Override
     public void dealOutParam(String result) {
         if (CollectionUtils.isEmpty(localParams)) {
             return;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
new file mode 100644
index 0000000000..e79d44e4da
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.task.api.parser;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Used to parse ${setValue()} and #{setValue()} from given lines.
+ */
+@Slf4j
+@NotThreadSafe
+public class TaskOutputParameterParser {
+
+    private final Map<String, String> taskOutputParams = new HashMap<>();
+
+    private List<String> currentTaskOutputParam;
+
+    public void appendParseLog(String log) {
+        if (log == null) {
+            return;
+        }
+
+        if (currentTaskOutputParam != null) {
+            // continue to parse the rest of line
+            int i = log.indexOf(")}");
+            if (i == -1) {
+                // the end of var pool not found
+                currentTaskOutputParam.add(log);
+            } else {
+                // the end of var pool found
+                currentTaskOutputParam.add(log.substring(0, i + 2));
+                Pair<String, String> keyValue = 
parseOutputParam(String.join("\n", currentTaskOutputParam));
+                if (keyValue.getKey() != null && keyValue.getValue() != null) {
+                    taskOutputParams.put(keyValue.getKey(), 
keyValue.getValue());
+                }
+                currentTaskOutputParam = null;
+                // continue to parse the rest of line
+                if (i + 2 != log.length()) {
+                    appendParseLog(log.substring(i + 2));
+                }
+            }
+            return;
+        }
+
+        int indexOfVarPoolBegin = log.indexOf("${setValue(");
+        if (indexOfVarPoolBegin == -1) {
+            indexOfVarPoolBegin = log.indexOf("#{setValue(");
+        }
+        if (indexOfVarPoolBegin == -1) {
+            return;
+        }
+        currentTaskOutputParam = new ArrayList<>();
+        appendParseLog(log.substring(indexOfVarPoolBegin));
+    }
+
+    public Map<String, String> getTaskOutputParams() {
+        return taskOutputParams;
+    }
+
+    // #{setValue(xx=xx)}
+    protected Pair<String, String> parseOutputParam(String outputParam) {
+        if (StringUtils.isEmpty(outputParam)) {
+            log.info("The task output param is empty");
+            return ImmutablePair.nullPair();
+        }
+        if ((!outputParam.startsWith("${setValue(") && 
!outputParam.startsWith("#{setValue("))
+                || !outputParam.endsWith(")}")) {
+            log.info("The task output param {} should start with '${setValue(' 
or '#{setValue(' and end with ')}'",
+                    outputParam);
+            return ImmutablePair.nullPair();
+        }
+        String keyValueExpression = outputParam.substring(11, 
outputParam.length() - 2);
+        if (!keyValueExpression.contains("=")) {
+            log.warn("The task output param {} should composite with 
key=value", outputParam);
+            return ImmutablePair.nullPair();
+        }
+
+        String[] keyValue = keyValueExpression.split("=", 2);
+        return ImmutablePair.of(keyValue[0], keyValue[1]);
+    }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
deleted file mode 100644
index bb777e7126..0000000000
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api.utils;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import lombok.experimental.UtilityClass;
-
-@UtilityClass
-public class VarPoolUtils {
-
-    static final Pattern DSVALUE_REGEX = 
Pattern.compile(TaskConstants.DSVALUE_REGEX);
-    public static final String VAR_SUFFIX = ")dsVal}";
-
-    public static final String VAR_DELIMITER = "$VarPool$";
-    /**
-     * find var pool
-     *
-     * @param line
-     * @return
-     */
-    public static String findVarPool(String line) {
-        Matcher matcher = DSVALUE_REGEX.matcher(line);
-        if (matcher.find()) {
-            return matcher.group(1);
-        }
-        return null;
-    }
-}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
index 9cf407e00f..a7439aef67 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
@@ -38,24 +38,4 @@ public class AbstractTaskTest {
         Assertions.assertEquals(jobId, str.substring(6));
     }
 
-    @Test
-    public void testSetValue() {
-        Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX);
-        String line1 = "${setValue(sql=\"INSERT INTO a VALUES (1, 2);\")}";
-        String line2 = "${setValue(a=2))}";
-        Matcher matcher1 = SETVALUE_REGEX.matcher(line1);
-        String str1 = null;
-        if (matcher1.find()) {
-            str1 = matcher1.group();
-        }
-        String str2 = null;
-        Matcher matcher2 = SETVALUE_REGEX.matcher(line2);
-        if (matcher2.find()) {
-            str2 = matcher2.group();
-        }
-        Assertions.assertNotNull(str1);
-        Assertions.assertNotNull(str2);
-        Assertions.assertEquals(str1.length(), line1.length());
-        Assertions.assertEquals(str2.length(), line2.length());
-    }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
index 2a793dc80a..4bbd29d18e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.plugin.task.api.k8s;
 
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-import static 
org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -65,7 +64,7 @@ public class K8sTaskExecutorTest {
         requirement.setKey("node-label");
         requirement.setOperator("In");
         requirement.setValues(Arrays.asList("1234", "123456"));
-        k8sTaskExecutor = new K8sTaskExecutor(logger, taskRequest);
+        k8sTaskExecutor = new K8sTaskExecutor(taskRequest);
         k8sTaskMainParameters = new K8sTaskMainParameters();
         k8sTaskMainParameters.setImage(image);
         k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy);
@@ -102,10 +101,4 @@ public class K8sTaskExecutorTest {
         }
     }
 
-    @Test
-    public void testValpool() {
-        String result = "key=value" + VAR_DELIMITER;
-        k8sTaskExecutor.varPool.append(result);
-        Assertions.assertEquals(result, k8sTaskExecutor.getVarPool());
-    }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
new file mode 100644
index 0000000000..950ed822c0
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.parser;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+class TaskOutputParameterParserTest {
+
+    @Test
+    void testEmptyLog() throws IOException, URISyntaxException {
+        List<String> varPools = getLogs("/outputParam/emptyVarPoolLog.txt");
+        TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
+        varPools.forEach(taskOutputParameterParser::appendParseLog);
+        
Assertions.assertTrue(taskOutputParameterParser.getTaskOutputParams().isEmpty());
+    }
+
+    @Test
+    void testOneLineLog() throws IOException, URISyntaxException {
+        List<String> varPools = getLogs("/outputParam/onelineVarPoolLog.txt");
+        TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
+        varPools.forEach(taskOutputParameterParser::appendParseLog);
+        assertEquals(ImmutableMap.of("name", "name=tom"), 
taskOutputParameterParser.getTaskOutputParams());
+    }
+
+    @Test
+    void testOneVarPollInMultiLineLog() throws IOException, URISyntaxException 
{
+        List<String> varPools = 
getLogs("/outputParam/oneVarPollInMultiLineLog.txt");
+        TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
+        varPools.forEach(taskOutputParameterParser::appendParseLog);
+        assertEquals(ImmutableMap.of("sql",
+                "select * from table\n" +
+                        "where\n" +
+                        "id = 1\n"),
+                taskOutputParameterParser.getTaskOutputParams());
+    }
+
+    @Test
+    void testVarPollInMultiLineLog() throws IOException, URISyntaxException {
+        List<String> varPools = getLogs("/outputParam/multipleVarPoll.txt");
+        TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
+        varPools.forEach(taskOutputParameterParser::appendParseLog);
+        assertEquals(ImmutableMap.of("name", "tom", "age", "1"), 
taskOutputParameterParser.getTaskOutputParams());
+    }
+
+    private List<String> getLogs(String file) throws IOException, 
URISyntaxException {
+        URI uri = 
TaskOutputParameterParserTest.class.getResource(file).toURI();
+        return Files.lines(Paths.get(uri)).collect(Collectors.toList());
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
deleted file mode 100644
index e083554bde..0000000000
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api.utils;
-
-import java.util.HashMap;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class VarPoolUtilsTest {
-
-    @Test
-    void findVar() {
-        HashMap<String, String> tcs = new HashMap<>();
-        tcs.put("${(set_val=123)dsVal}", "set_val=123");
-        tcs.put("1970-01-01 ${(set_val=123)dsVal}", "set_val=123");
-        tcs.put("1970-01-01 ${(set_val=123)dsVal}123", null);
-        tcs.put("${(set_val=123}dsVal", null);
-        tcs.put("#{(set_val=123)dsVal}", "set_val=123");
-        tcs.put("1970-01-01 #{(set_val=123)dsVal}", "set_val=123");
-        tcs.put("1970-01-01 #{(set_val=123)dsVal}123", null);
-        tcs.put("#{(set_val=123)dsVal}123", null);
-        tcs.put("#{(set_val=123dsVal}", null);
-
-        tcs.put("${(set_val=123)dsVal}${(set_val=456)dsVal}", 
"set_val=123)dsVal}${(set_val=456");
-        tcs.put("1970-01-01$#{(set_val=123)dsVal}", "set_val=123");
-        tcs.put("1970-01-01{(set_val=123)dsVal}123", null);
-        tcs.put("1970-01-01$#{(${(set_val=123)})dsVal}", "${(set_val=123)}");
-        tcs.put("1970-01-01$#{(${(set_val=123\\)})dsVal}", 
"${(set_val=123\\)}");
-
-        for (String tc : tcs.keySet()) {
-            Assertions.assertEquals(tcs.get(tc), VarPoolUtils.findVarPool(tc));
-        }
-    }
-}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt
new file mode 100644
index 0000000000..a9fd83fea0
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
new file mode 100644
index 0000000000..994cee503e
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+INFO: ${setValue(name=tom)}
+INFO: ${setValue(age=1)}
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt
new file mode 100644
index 0000000000..b26467cb27
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+${setValue(sql=select * from table
+where
+id = 1
+)}
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt
new file mode 100644
index 0000000000..6d80b15695
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+${setValue(name=name=tom)}
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index 2e4731d704..e3814a076d 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -45,9 +45,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-/**
- * chunjun task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class ChunJunTask extends AbstractTask {
 
     /**
@@ -79,8 +79,7 @@ public class ChunJunTask extends AbstractTask {
         super(taskExecutionContext);
         this.taskExecutionContext = taskExecutionContext;
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext, log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
index e36f0659d3..394a58f417 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
@@ -30,9 +30,11 @@ import java.util.List;
 
 import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 
 @Setter
 @Getter
+@Slf4j
 public class DatafactoryTask extends AbstractRemoteTask {
 
     private final TaskExecutionContext taskExecutionContext;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
index 381b8cbd37..1bf1a6454c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
@@ -58,11 +58,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
+
 /**
  * In DataQualityTask, the input parameters will be converted into 
DataQualityConfiguration,
  * which will be converted into a string as the parameter of 
DataQualityApplication,
  * and DataQualityApplication is spark application
  */
+@Slf4j
 public class DataQualityTask extends AbstractYarnTask {
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
index c9e3afd29b..2353dc5c4a 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.List;
 
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,6 +44,7 @@ import 
com.fasterxml.jackson.databind.PropertyNamingStrategies;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 
 @Setter
+@Slf4j
 public class DatasyncTask extends AbstractRemoteTask {
 
     private static final ObjectMapper objectMapper =
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 9b55f00fc9..6490b8b0b0 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -56,6 +56,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.alibaba.druid.sql.ast.SQLStatement;
 import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
 import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
@@ -68,6 +70,7 @@ import com.alibaba.druid.sql.parser.SQLStatementParser;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
+@Slf4j
 public class DataxTask extends AbstractTask {
 
     /**
@@ -125,8 +128,7 @@ public class DataxTask extends AbstractTask {
         super(taskExecutionContext);
         this.taskExecutionContext = taskExecutionContext;
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext, log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
index fdaf70add4..694317122e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
@@ -45,28 +45,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.MissingNode;
 
+@Slf4j
 public class DinkyTask extends AbstractRemoteTask {
 
-    /**
-     * taskExecutionContext
-     */
     private final TaskExecutionContext taskExecutionContext;
 
-    /**
-     * dinky parameters
-     */
     private DinkyParameters dinkyParameters;
 
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
     protected DinkyTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
         this.taskExecutionContext = taskExecutionContext;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
index 74cf5b9b29..29424a5b76 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
@@ -35,12 +35,15 @@ import org.apache.commons.lang3.StringUtils;
 import java.util.Collections;
 import java.util.List;
 
+import lombok.extern.slf4j.Slf4j;
+
 import 
com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
 import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.PropertyNamingStrategy;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 
+@Slf4j
 public class DmsTask extends AbstractRemoteTask {
 
     private static final ObjectMapper objectMapper =
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 032d953858..d1a8bf8b1b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -33,9 +33,9 @@ import 
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilder
 import java.util.ArrayList;
 import java.util.List;
 
-/**
- * shell task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class DvcTask extends AbstractTask {
 
     /**
@@ -62,7 +62,7 @@ public class DvcTask extends AbstractTask {
         super(taskExecutionContext);
 
         this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext, log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     @Override
@@ -85,7 +85,7 @@ public class DvcTask extends AbstractTask {
             TaskResponse commandExecuteResult = 
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
-            parameters.dealOutParam(shellCommandExecutor.getVarPool());
+            
parameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("The current DvcTask has been interrupted", e);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
index b4603c4605..6f6ec63a29 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
@@ -31,6 +31,8 @@ import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
 
 import java.util.TimeZone;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
@@ -44,6 +46,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
  *
  * @since v3.1.0
  */
+@Slf4j
 public abstract class AbstractEmrTask extends AbstractRemoteTask {
 
     final TaskExecutionContext taskExecutionContext;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index ff3331d530..753b206e21 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.amazonaws.SdkBaseException;
 import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
 import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@@ -45,6 +47,7 @@ import com.google.common.collect.Sets;
  *
  * @since v3.1.0
  */
+@Slf4j
 public class EmrAddStepsTask extends AbstractEmrTask {
 
     private String stepId;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index 9abff26818..f4b0534065 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.amazonaws.SdkBaseException;
 import com.amazonaws.services.elasticmapreduce.model.ClusterState;
 import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason;
@@ -40,6 +42,7 @@ import 
com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Sets;
 
+@Slf4j
 public class EmrJobFlowTask extends AbstractEmrTask {
 
     private final HashSet<String> waitingStateSet = Sets.newHashSet(
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index 3a71df62c7..9ec10889bb 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -31,17 +31,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class FlinkStreamTask extends FlinkTask implements StreamTask {
 
-    /**
-     * flink parameters
-     */
     private FlinkStreamParameters flinkParameters;
 
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
+    private final TaskExecutionContext taskExecutionContext;
 
     public FlinkStreamTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 75764c3677..4f2963116c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -29,6 +29,9 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class FlinkTask extends AbstractYarnTask {
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 28443423c2..0ca6a2c39a 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -53,6 +53,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class HiveCliTask extends AbstractRemoteTask {
 
     private HiveCliParameters hiveCliParameters;
@@ -65,9 +68,7 @@ public class HiveCliTask extends AbstractRemoteTask {
         super(taskExecutionContext);
         this.taskExecutionContext = taskExecutionContext;
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     @Override
@@ -96,7 +97,7 @@ public class HiveCliTask extends AbstractRemoteTask {
             setExitStatusCode(taskResponse.getExitStatusCode());
             setAppIds(taskResponse.getAppIds());
             setProcessId(taskResponse.getProcessId());
-            setVarPool(shellCommandExecutor.getVarPool());
+            setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("The current HiveCLI Task has been interrupted", e);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
index 8d696a19b1..d51c99b2a7 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
@@ -52,8 +52,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
+@Slf4j
 public class HttpTask extends AbstractTask {
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index e9850b6459..5b4811bf8b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -52,8 +52,11 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.google.common.base.Preconditions;
 
+@Slf4j
 public class JavaTask extends AbstractTask {
 
     /**
@@ -79,9 +82,7 @@ public class JavaTask extends AbstractTask {
     public JavaTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
         this.taskRequest = taskRequest;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskRequest,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskRequest);
     }
 
     /**
@@ -131,7 +132,7 @@ public class JavaTask extends AbstractTask {
             setExitStatusCode(taskResponse.getExitStatusCode());
             setAppIds(taskResponse.getAppIds());
             setProcessId(taskResponse.getProcessId());
-            setVarPool(shellCommandExecutor.getVarPool());
+            setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
         } catch (InterruptedException e) {
             log.error("java task interrupted ", e);
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 735b6237e2..5f79b139a9 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -41,8 +41,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+@Slf4j
 public class JupyterTask extends AbstractRemoteTask {
 
     private JupyterParameters jupyterParameters;
@@ -54,9 +57,7 @@ public class JupyterTask extends AbstractRemoteTask {
     public JupyterTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
         this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     @Override
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
index c02adbfa76..fceb29e163 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -43,26 +43,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
 import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
 
+@Slf4j
 public class K8sTask extends AbstractK8sTask {
 
-    /**
-     * taskExecutionContext
-     */
     private final TaskExecutionContext taskExecutionContext;
 
-    /**
-     * task parameters
-     */
     private K8sTaskParameters k8sTaskParameters;
 
     private K8sTaskExecutionContext k8sTaskExecutionContext;
 
     private K8sConnectionParam k8sConnectionParam;
-    /**
-     * @param taskRequest taskRequest
-     */
     public K8sTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
         this.taskExecutionContext = taskRequest;
@@ -119,8 +112,8 @@ public class K8sTask extends AbstractK8sTask {
     }
 
     @Override
-    protected void dealOutParam(String result) {
-        this.k8sTaskParameters.dealOutParam(result);
+    protected void dealOutParam(Map<String, String> taskOutputParams) {
+        this.k8sTaskParameters.dealOutParam(taskOutputParams);
     }
 
     public List<NodeSelectorRequirement> 
convertToNodeSelectorRequirements(List<NodeSelectorExpression> expressions) {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
index ef21d34e8e..3895190cf2 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.plugin.task.k8s;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -47,6 +46,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
+import com.google.common.collect.ImmutableMap;
 import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
 
 public class K8sTaskTest {
@@ -167,7 +167,7 @@ public class K8sTaskTest {
 
     @Test
     public void testDealOutParam() {
-        String result = "key=123" + VAR_DELIMITER;
+        Map<String, String> result = ImmutableMap.of("key", "123");
         k8sTask.getParameters().localParams.add(new Property("key", 
Direct.OUT, DataType.VARCHAR, "value"));
         k8sTask.dealOutParam(result);
         k8sTask.getParameters().getVarPool().forEach(property -> {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
index 4ae9a95a5d..0a65cc160c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
@@ -36,6 +36,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class KubeflowTask extends AbstractRemoteTask {
 
     private final TaskExecutionContext taskExecutionContext;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
index c091ee6f64..a4ab0b96a9 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
@@ -41,9 +41,9 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-/**
- * linkis task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class LinkisTask extends AbstractRemoteTask {
 
     /**
@@ -76,9 +76,7 @@ public class LinkisTask extends AbstractRemoteTask {
         super(taskExecutionContext);
 
         this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     @Override
@@ -107,7 +105,7 @@ public class LinkisTask extends AbstractRemoteTask {
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(findTaskId(commandExecuteResult.getResultString()));
             setProcessId(commandExecuteResult.getProcessId());
-            linkisParameters.dealOutParam(shellCommandExecutor.getVarPool());
+            
linkisParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("The current Linkis task has been interrupted", e);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index 3c1f73747c..bc97be04e6 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -42,9 +42,9 @@ import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-/**
- * shell task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class MlflowTask extends AbstractTask {
 
     private static final Pattern GIT_CHECK_PATTERN = 
Pattern.compile("^(git@|https?://)");
@@ -71,7 +71,7 @@ public class MlflowTask extends AbstractTask {
         super(taskExecutionContext);
 
         this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext, log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     static public String getPresetRepository() {
@@ -130,7 +130,7 @@ public class MlflowTask extends AbstractTask {
             }
             setExitStatusCode(exitCode);
             setProcessId(commandExecuteResult.getProcessId());
-            mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
+            
mlflowParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("The current Mlflow task has been interrupted", e);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
index dd51314dfa..0979dc73cd 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
@@ -30,9 +30,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-/**
- * mapreduce task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class MapReduceTask extends AbstractYarnTask {
 
     /**
@@ -41,20 +41,10 @@ public class MapReduceTask extends AbstractYarnTask {
      */
     private static final String MAPREDUCE_COMMAND = TaskConstants.HADOOP;
 
-    /**
-     * mapreduce parameters
-     */
     private MapReduceParameters mapreduceParameters;
 
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
+    private final TaskExecutionContext taskExecutionContext;
 
-    /**
-     * constructor
-     * @param taskExecutionContext taskExecutionContext
-     */
     public MapReduceTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
         this.taskExecutionContext = taskExecutionContext;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
index 9142a364a1..6f97d0867c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
@@ -32,11 +32,11 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.google.common.base.Preconditions;
 
-/**
- * openmldb task
- */
+@Slf4j
 public class OpenmldbTask extends PythonTask {
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
index 8bab12133e..55af378e83 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
@@ -46,12 +46,15 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.java_websocket.client.WebSocketClient;
 import org.java_websocket.handshake.ServerHandshake;
 
 /**
  * TIS DataX Task
  **/
+@Slf4j
 public class PigeonTask extends AbstractRemoteTask {
 
     public static final String KEY_POOL_VAR_PIGEON_HOST = "p_host";
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 2bdbcd5e23..6a66ba1965 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -46,24 +46,18 @@ import java.sql.Types;
 import java.util.HashMap;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.google.common.collect.Maps;
 
-/**
- * procedure task
- */
+@Slf4j
 public class ProcedureTask extends AbstractTask {
 
-    /**
-     * procedure parameters
-     */
-    private ProcedureParameters procedureParameters;
+    private final ProcedureParameters procedureParameters;
 
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
+    private final TaskExecutionContext taskExecutionContext;
 
-    private ProcedureTaskExecutionContext procedureTaskExecutionContext;
+    private final ProcedureTaskExecutionContext procedureTaskExecutionContext;
 
     /**
      * constructor
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 3fd938d3a5..539ecd7240 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -40,21 +40,15 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.google.common.base.Preconditions;
 
-/**
- * python task
- */
+@Slf4j
 public class PythonTask extends AbstractTask {
 
-    /**
-     * python parameters
-     */
     protected PythonParameters pythonParameters;
 
-    /**
-     * shell command executor
-     */
     private ShellCommandExecutor shellCommandExecutor;
 
     protected TaskExecutionContext taskRequest;
@@ -70,9 +64,7 @@ public class PythonTask extends AbstractTask {
         super(taskRequest);
         this.taskRequest = taskRequest;
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskRequest,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskRequest);
     }
 
     @Override
@@ -104,8 +96,8 @@ public class PythonTask extends AbstractTask {
             TaskResponse taskResponse = 
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setProcessId(taskResponse.getProcessId());
-            setVarPool(shellCommandExecutor.getVarPool());
-            pythonParameters.dealOutParam(shellCommandExecutor.getVarPool());
+            setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
+            
pythonParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
         } catch (Exception e) {
             log.error("python task failure", e);
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index ffefc83a8e..1d7a874dfc 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -34,6 +34,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class PytorchTask extends AbstractTask {
 
     private final ShellCommandExecutor shellCommandExecutor;
@@ -45,9 +48,7 @@ public class PytorchTask extends AbstractTask {
         super(taskExecutionContext);
         this.taskExecutionContext = taskExecutionContext;
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     @Override
@@ -76,7 +77,7 @@ public class PytorchTask extends AbstractTask {
             TaskResponse taskResponse = 
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setProcessId(taskResponse.getProcessId());
-            setVarPool(shellCommandExecutor.getVarPool());
+            setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("The current Pytorch task has been interrupted", e);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
index 650bf84a69..4de28b7fb5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.remoteshell;
 
 import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils;
 import 
org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import 
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sshd.client.SshClient;
@@ -36,24 +36,19 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.EnumSet;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j
 public class RemoteExecutor {
 
-    protected final Logger logger =
-            
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME, 
getClass()));
-
-    protected static final Pattern SETVALUE_REGEX = 
Pattern.compile(TaskConstants.SETVALUE_REGEX);
-
     static final String REMOTE_SHELL_HOME = 
"/tmp/dolphinscheduler-remote-shell-%s/";
     static final String STATUS_TAG_MESSAGE = 
"DOLPHINSCHEDULER-REMOTE-SHELL-TASK-STATUS-";
     static final int TRACK_INTERVAL = 5000;
 
-    protected StringBuilder varPool = new StringBuilder();
+    protected Map<String, String> taskOutputParams = new HashMap<>();
 
     SshClient sshClient;
     ClientSession session;
@@ -105,60 +100,44 @@ public class RemoteExecutor {
     public void track(String taskId) throws Exception {
         int logN = 0;
         String pid;
-        logger.info("Remote shell task log:");
+        log.info("Remote shell task log:");
+        TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
         do {
             pid = getTaskPid(taskId);
             String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 
1, getRemoteShellHome(), taskId);
-            String log = runRemote(trackCommand);
-            if (StringUtils.isEmpty(log)) {
+            String logLine = runRemote(trackCommand);
+            if (StringUtils.isEmpty(logLine)) {
                 Thread.sleep(TRACK_INTERVAL);
             } else {
-                logN += log.split("\n").length;
-                setVarPool(log);
-                logger.info(log);
+                logN += logLine.split("\n").length;
+                log.info(logLine);
+                taskOutputParameterParser.appendParseLog(logLine);
             }
         } while (StringUtils.isNotEmpty(pid));
+        
taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
     }
 
-    public String getVarPool() {
-        return varPool.toString();
-    }
-
-    private void setVarPool(String log) {
-        String[] lines = log.split("\n");
-        for (String line : lines) {
-            if (line.startsWith("${setValue(") || 
line.startsWith("#{setValue(")) {
-                varPool.append(findVarPool(line));
-                varPool.append("$VarPool$");
-            }
-        }
-    }
-
-    private String findVarPool(String line) {
-        Matcher matcher = SETVALUE_REGEX.matcher(line);
-        if (matcher.find()) {
-            return matcher.group(1);
-        }
-        return null;
+    public Map<String, String> getTaskOutputParams() {
+        return taskOutputParams;
     }
 
     public Integer getTaskExitCode(String taskId) throws IOException {
         String trackCommand = String.format(COMMAND.LOG_TAIL_COMMAND, 
getRemoteShellHome(), taskId);
-        String log = runRemote(trackCommand);
+        String logLine = runRemote(trackCommand);
         int exitCode = -1;
-        logger.info("Remote shell task run status: {}", log);
-        if (log.contains(STATUS_TAG_MESSAGE)) {
-            String status = log.replace(STATUS_TAG_MESSAGE, "").trim();
+        log.info("Remote shell task run status: {}", logLine);
+        if (logLine.contains(STATUS_TAG_MESSAGE)) {
+            String status = logLine.replace(STATUS_TAG_MESSAGE, "").trim();
             if (status.equals("0")) {
-                logger.info("Remote shell task success");
+                log.info("Remote shell task success");
                 exitCode = 0;
             } else {
-                logger.error("Remote shell task failed");
+                log.error("Remote shell task failed");
                 exitCode = Integer.parseInt(status);
             }
         }
         cleanData(taskId);
-        logger.error("Remote shell task failed");
+        log.error("Remote shell task failed");
         return exitCode;
     }
 
@@ -168,7 +147,7 @@ public class RemoteExecutor {
         try {
             runRemote(cleanCommand);
         } catch (Exception e) {
-            logger.error("Remote shell task clean data failed, but will not 
affect the task execution", e);
+            log.error("Remote shell task clean data failed, but will not 
affect the task execution", e);
         }
     }
 
@@ -189,14 +168,14 @@ public class RemoteExecutor {
         runRemote(checkDirCommand);
         uploadScript(taskId, localFile);
 
-        logger.info("The final script is: \n{}",
+        log.info("The final script is: \n{}",
                 runRemote(String.format(COMMAND.CAT_FINAL_SCRIPT, 
getRemoteShellHome(), taskId)));
     }
 
     public void uploadScript(String taskId, String localFile) throws 
IOException {
 
         String remotePath = getRemoteShellHome() + taskId + ".sh";
-        logger.info("upload script from local:{} to remote: {}", localFile, 
remotePath);
+        log.info("upload script from local:{} to remote: {}", localFile, 
remotePath);
         try (SftpFileSystem fs = 
SftpClientFactory.instance().createSftpFileSystem(getSession())) {
             Path path = fs.getPath(remotePath);
             Files.copy(Paths.get(localFile), path);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
index 216203bfea..561392896c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
@@ -42,9 +42,9 @@ import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.Map;
 
-/**
- * shell task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class RemoteShellTask extends AbstractTask {
 
     static final String TASK_ID_PREFIX = "dolphinscheduler-remoteshell-";
@@ -102,7 +102,7 @@ public class RemoteShellTask extends AbstractTask {
             String localFile = buildCommand();
             int exitCode = remoteExecutor.run(taskId, localFile);
             setExitStatusCode(exitCode);
-            remoteShellParameters.dealOutParam(remoteExecutor.getVarPool());
+            
remoteShellParameters.dealOutParam(remoteExecutor.getTaskOutputParams());
         } catch (Exception e) {
             log.error("shell task error", e);
             setExitStatusCode(EXIT_CODE_FAILURE);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
index d04f5a3fac..b1b2cc811f 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -39,6 +39,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
@@ -52,6 +54,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper;
 /**
  * SagemakerTask task, Used to start Sagemaker pipeline
  */
+@Slf4j
 public class SagemakerTask extends AbstractRemoteTask {
 
     private static final ObjectMapper objectMapper = JsonMapper.builder()
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index 0b837fa568..b6d0b6136e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -47,9 +47,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-/**
- * seatunnel task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class SeatunnelTask extends AbstractRemoteTask {
 
     private static final String SEATUNNEL_BIN_DIR = "${SEATUNNEL_HOME}/bin/";
@@ -78,9 +78,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
         super(taskExecutionContext);
 
         this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     @Override
@@ -109,7 +107,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
-            
seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
+            
seatunnelParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("The current SeaTunnel task has been interrupted", e);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index b657f9a9ef..71cfb9522c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -31,9 +31,9 @@ import 
org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilde
 import 
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
-/**
- * shell task
- */
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class ShellTask extends AbstractTask {
 
     /**
@@ -60,9 +60,7 @@ public class ShellTask extends AbstractTask {
         super(taskExecutionContext);
 
         this.taskExecutionContext = taskExecutionContext;
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext,
-                log);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskExecutionContext);
     }
 
     @Override
@@ -87,7 +85,7 @@ public class ShellTask extends AbstractTask {
             TaskResponse commandExecuteResult = 
shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
-            shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
+            
shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("The current Shell task has been interrupted", e);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index ef0b71c8fe..99a9d9e61b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -53,19 +53,15 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
 import io.fabric8.kubernetes.client.Config;
 
+@Slf4j
 public class SparkTask extends AbstractYarnTask {
 
-    /**
-     * spark parameters
-     */
     private SparkParameters sparkParameters;
 
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
+    private final TaskExecutionContext taskExecutionContext;
 
     public SparkTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 091dfc5794..77886db59c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -61,24 +61,18 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
+@Slf4j
 public class SqlTask extends AbstractTask {
 
-    /**
-     * taskExecutionContext
-     */
-    private TaskExecutionContext taskExecutionContext;
+    private final TaskExecutionContext taskExecutionContext;
 
-    /**
-     * sql parameters
-     */
-    private SqlParameters sqlParameters;
+    private final SqlParameters sqlParameters;
 
-    /**
-     * base datasource
-     */
     private BaseConnectionParam baseConnectionParam;
 
     /**
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
index 016a07a56b..86aea61f0c 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
@@ -29,19 +29,16 @@ import 
org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
 
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
 /**
  * sqoop task extends the shell task
  */
+@Slf4j
 public class SqoopTask extends AbstractYarnTask {
 
-    /**
-     * sqoop task params
-     */
     private SqoopParameters sqoopParameters;
 
-    /**
-     * taskExecutionContext
-     */
     private final TaskExecutionContext taskExecutionContext;
 
     private SqoopTaskExecutionContext sqoopTaskExecutionContext;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index 01459111b5..c3b580de35 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -42,19 +42,15 @@ import java.util.List;
 import java.util.Map;
 
 import kong.unirest.Unirest;
+import lombok.extern.slf4j.Slf4j;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+@Slf4j
 public class ZeppelinTask extends AbstractRemoteTask {
 
-    /**
-     * taskExecutionContext
-     */
     private final TaskExecutionContext taskExecutionContext;
 
-    /**
-     * zeppelin parameters
-     */
     private ZeppelinParameters zeppelinParameters;
 
     /**
@@ -66,11 +62,6 @@ public class ZeppelinTask extends AbstractRemoteTask {
 
     private ZeppelinTaskExecutionContext zeppelinTaskExecutionContext;
 
-    /**
-     * constructor
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
     protected ZeppelinTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
         this.taskExecutionContext = taskExecutionContext;

Reply via email to