This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 61915a2d5d Fix workflow instance restart failed due to duplicate key 
in varpool (#16001)
61915a2d5d is described below

commit 61915a2d5dfbb0470d536d8954529029e0fbf120
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu May 16 11:32:27 2024 +0800

    Fix workflow instance restart failed due to duplicate key in varpool 
(#16001)
---
 .../dolphinscheduler/api/ApiApplicationServer.java |   2 +
 .../api/service/impl/TaskInstanceServiceImpl.java  |  13 +-
 .../api/plugin/DataSourceProcessorProvider.java    |   4 +
 .../server/master/MasterServer.java                |   2 +
 .../master/runner/WorkflowExecuteRunnable.java     | 200 ++++-----------------
 .../master/runner/WorkflowExecuteRunnableTest.java |  19 +-
 .../plugin/task/api/model/Property.java            |  59 +-----
 .../task/api/parameters/AbstractParameters.java    |  44 ++---
 .../plugin/task/api/parameters/SqlParameters.java  |   8 +-
 .../plugin/task/api/utils/VarPoolUtils.java        | 119 ++++++++++++
 .../plugin/task/api/utils/VarPoolUtilsTest.java    |  62 +++++++
 .../server/worker/WorkerServer.java                |   2 +
 .../server/worker/runner/WorkerTaskExecutor.java   |   1 +
 .../worker/utils/TaskFilesTransferUtils.java       |  22 ++-
 14 files changed, 281 insertions(+), 276 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 6f7d8f43d2..f8b705179a 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -24,6 +24,7 @@ import 
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
 import org.apache.dolphinscheduler.dao.DaoConfiguration;
 import org.apache.dolphinscheduler.dao.PluginDao;
 import org.apache.dolphinscheduler.dao.entity.PluginDefine;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
 import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
@@ -69,6 +70,7 @@ public class ApiApplicationServer {
         log.info("Received spring application context ready event will load 
taskPlugin and write to DB");
         // install task plugin
         TaskPluginManager.loadPlugin();
+        DataSourceProcessorProvider.initialize();
         for (Map.Entry<String, TaskChannelFactory> entry : 
TaskPluginManager.getTaskChannelFactoryMap().entrySet()) {
             String taskPluginName = entry.getKey();
             TaskChannelFactory taskChannelFactory = entry.getValue();
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 7469d8db13..49e42da561 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -367,10 +367,15 @@ public class TaskInstanceServiceImpl extends 
BaseServiceImpl implements TaskInst
         }
         for (TaskInstance taskInstance : needToDeleteTaskInstances) {
             if (StringUtils.isNotBlank(taskInstance.getLogPath())) {
-                ILogService iLogService =
-                        
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
-                                ILogService.class);
-                iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
+                try {
+                    // Remove task instance log failed will not affect the 
deletion of task instance
+                    ILogService iLogService =
+                            
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
+                                    ILogService.class);
+                    
iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
+                } catch (Exception ex) {
+                    log.error("Remove task instance log error", ex);
+                }
             }
         }
 
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
index 751ac1ba08..973421615f 100644
--- 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
@@ -37,6 +37,10 @@ public class DataSourceProcessorProvider {
     private DataSourceProcessorProvider() {
     }
 
+    public static void initialize() {
+        log.info("Initialize DataSourceProcessorProvider");
+    }
+
     public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType 
dbType) {
         return 
dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name());
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 7988570135..9b507500b9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.DaoConfiguration;
 import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
 import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
 import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@@ -108,6 +109,7 @@ public class MasterServer implements IStoppable {
 
         // install task plugin
         TaskPluginManager.loadPlugin();
+        DataSourceProcessorProvider.initialize();
 
         this.masterSlotManager.start();
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 725b7e000a..72c52922d7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -57,11 +57,10 @@ import 
org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
@@ -97,6 +96,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -376,7 +376,8 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
 
             if (taskInstance.getState().isSuccess()) {
                 completeTaskSet.add(taskInstance.getTaskCode());
-                mergeTaskInstanceVarPool(taskInstance);
+                
workflowInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(
+                        Lists.newArrayList(workflowInstance.getVarPool(), 
taskInstance.getVarPool())));
                 processInstanceDao.upsertProcessInstance(workflowInstance);
                 ProjectUser projectUser =
                         
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
@@ -441,7 +442,6 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
 
     /**
      * crate new task instance to retry, different objects from the original
-     *
      */
     private void retryTaskInstance(TaskInstance taskInstance) throws 
StateEventHandleException {
         ProcessInstance workflowInstance = 
workflowExecuteContext.getWorkflowInstance();
@@ -532,16 +532,6 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         }
     }
 
-    /**
-     * check if task instance exist by id
-     */
-    public boolean checkTaskInstanceById(int taskInstanceId) {
-        if (taskInstanceMap.isEmpty()) {
-            return false;
-        }
-        return taskInstanceMap.containsKey(taskInstanceId);
-    }
-
     /**
      * get task instance from memory
      */
@@ -1070,7 +1060,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
      * new a taskInstance
      *
      * @param processInstance process instance
-     * @param taskNode task node
+     * @param taskNode        task node
      * @return task instance
      */
     public TaskInstance newTaskInstance(ProcessInstance processInstance, 
TaskNode taskNode) {
@@ -1161,80 +1151,32 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         return taskInstance;
     }
 
-    public void getPreVarPool(TaskInstance taskInstance, Set<Long> preTask) {
+    void initializeTaskInstanceVarPool(TaskInstance taskInstance) {
+        // get pre task ,get all the task varPool to this task
+        // Do not use dag.getPreviousNodes because of the dag may be miss the 
upstream node
+        String preTasks =
+                
workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode()).getPreTasks();
+        Set<Long> preTaskList = new HashSet<>(JSONUtils.toList(preTasks, 
Long.class));
         ProcessInstance workflowInstance = 
workflowExecuteContext.getWorkflowInstance();
-        Map<String, Property> allProperty = new HashMap<>();
-        Map<String, TaskInstance> allTaskInstance = new HashMap<>();
-        if (CollectionUtils.isNotEmpty(preTask)) {
-            for (Long preTaskCode : preTask) {
-                Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(preTaskCode);
-                if (!existTaskInstanceOptional.isPresent()) {
-                    continue;
-                }
 
-                Integer taskId = existTaskInstanceOptional.get().getId();
-                if (taskId == null) {
-                    continue;
-                }
-                TaskInstance preTaskInstance = taskInstanceMap.get(taskId);
-                if (preTaskInstance == null) {
-                    continue;
-                }
-                String preVarPool = preTaskInstance.getVarPool();
-                if (StringUtils.isNotEmpty(preVarPool)) {
-                    List<Property> properties = JSONUtils.toList(preVarPool, 
Property.class);
-                    for (Property info : properties) {
-                        setVarPoolValue(allProperty, allTaskInstance, 
preTaskInstance, info);
-                    }
-                }
-            }
-            if (allProperty.size() > 0) {
-                
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
-            }
-        } else {
-            if (StringUtils.isNotEmpty(workflowInstance.getVarPool())) {
-                taskInstance.setVarPool(workflowInstance.getVarPool());
-            }
+        if (CollectionUtils.isEmpty(preTaskList)) {
+            taskInstance.setVarPool(workflowInstance.getVarPool());
+            return;
         }
+        List<String> preTaskInstanceVarPools = preTaskList
+                .stream()
+                .map(taskCode -> getTaskInstance(taskCode).orElse(null))
+                .filter(Objects::nonNull)
+                .sorted(Comparator.comparing(TaskInstance::getEndTime))
+                .map(TaskInstance::getVarPool)
+                .collect(Collectors.toList());
+        
taskInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(preTaskInstanceVarPools));
     }
 
     public Collection<TaskInstance> getAllTaskInstances() {
         return taskInstanceMap.values();
     }
 
-    private void setVarPoolValue(Map<String, Property> allProperty,
-                                 Map<String, TaskInstance> allTaskInstance,
-                                 TaskInstance preTaskInstance, Property 
thisProperty) {
-        // for this taskInstance all the param in this part is IN.
-        thisProperty.setDirect(Direct.IN);
-        // get the pre taskInstance Property's name
-        String proName = thisProperty.getProp();
-        // if the Previous nodes have the Property of same name
-        if (allProperty.containsKey(proName)) {
-            // comparison the value of two Property
-            Property otherPro = allProperty.get(proName);
-            // if this property'value of loop is empty,use the other,whether 
the other's value is empty or not
-            if (StringUtils.isEmpty(thisProperty.getValue())) {
-                allProperty.put(proName, otherPro);
-                // if property'value of loop is not empty,and the other's 
value is not empty too, use the earlier value
-            } else if (StringUtils.isNotEmpty(otherPro.getValue())) {
-                TaskInstance otherTask = allTaskInstance.get(proName);
-                if (otherTask.getEndTime().getTime() > 
preTaskInstance.getEndTime().getTime()) {
-                    allProperty.put(proName, thisProperty);
-                    allTaskInstance.put(proName, preTaskInstance);
-                } else {
-                    allProperty.put(proName, otherPro);
-                }
-            } else {
-                allProperty.put(proName, thisProperty);
-                allTaskInstance.put(proName, preTaskInstance);
-            }
-        } else {
-            allProperty.put(proName, thisProperty);
-            allTaskInstance.put(proName, preTaskInstance);
-        }
-    }
-
     /**
      * get complete task instance map, taskCode as key
      */
@@ -1311,45 +1253,10 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         }
         // the end node of the branch of the dag
         if (parentNodeCode != null && 
dag.getEndNode().contains(parentNodeCode)) {
-            Optional<TaskInstance> existTaskInstanceOptional = 
getTaskInstance(parentNodeCode);
-            if (existTaskInstanceOptional.isPresent()) {
-                TaskInstance endTaskInstance = 
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
-                String taskInstanceVarPool = endTaskInstance.getVarPool();
-                if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
-                    Set<Property> taskProperties = new 
HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
-                    String processInstanceVarPool = 
workflowInstance.getVarPool();
-                    List<Property> processGlobalParams =
-                            new 
ArrayList<>(JSONUtils.toList(workflowInstance.getGlobalParams(), 
Property.class));
-                    Map<String, Direct> oldProcessGlobalParamsMap = 
processGlobalParams.stream()
-                            .collect(Collectors.toMap(Property::getProp, 
Property::getDirect));
-                    Set<Property> processVarPoolOut = taskProperties.stream()
-                            .filter(property -> 
property.getDirect().equals(Direct.OUT)
-                                    && 
oldProcessGlobalParamsMap.containsKey(property.getProp())
-                                    && 
oldProcessGlobalParamsMap.get(property.getProp()).equals(Direct.OUT))
-                            .collect(Collectors.toSet());
-                    Set<Property> taskVarPoolIn =
-                            taskProperties.stream().filter(property -> 
property.getDirect().equals(Direct.IN))
-                                    .collect(Collectors.toSet());
-                    if (StringUtils.isNotEmpty(processInstanceVarPool)) {
-                        Set<Property> properties =
-                                new 
HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
-                        Set<String> newProcessVarPoolKeys =
-                                
taskProperties.stream().map(Property::getProp).collect(Collectors.toSet());
-                        properties = properties.stream()
-                                .filter(property -> 
!newProcessVarPoolKeys.contains(property.getProp()))
-                                .collect(Collectors.toSet());
-                        properties.addAll(processVarPoolOut);
-                        properties.addAll(taskVarPoolIn);
-
-                        
workflowInstance.setVarPool(JSONUtils.toJsonString(properties));
-                    } else {
-                        Set<Property> varPool = new HashSet<>();
-                        varPool.addAll(taskVarPoolIn);
-                        varPool.addAll(processVarPoolOut);
-                        
workflowInstance.setVarPool(JSONUtils.toJsonString(varPool));
-                    }
-                }
-            }
+            getTaskInstance(parentNodeCode)
+                    .ifPresent(endTaskInstance -> 
workflowInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(
+                            Lists.newArrayList(workflowInstance.getVarPool(), 
endTaskInstance.getVarPool()))));
+
         }
 
         // if previous node success , post node submit
@@ -1907,14 +1814,8 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                     continue;
                 }
             }
-            // init varPool only this task is the first time running
             if (task.isFirstRun()) {
-                // get pre task ,get all the task varPool to this task
-                // Do not use dag.getPreviousNodes because of the dag may be 
miss the upstream node
-                String preTasks = workflowExecuteContext.getWorkflowGraph()
-                        .getTaskNodeByCode(task.getTaskCode()).getPreTasks();
-                Set<Long> preTaskList = new 
HashSet<>(JSONUtils.toList(preTasks, Long.class));
-                getPreVarPool(task, preTaskList);
+                initializeTaskInstanceVarPool(task);
             }
             DependResult dependResult = getDependResultForTask(task);
             if (DependResult.SUCCESS == dependResult) {
@@ -2095,29 +1996,9 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
             taskInstanceDao.updateById(taskInstance);
         }
 
-        Set<String> removeSet = new HashSet<>();
-        for (TaskInstance taskInstance : removeTaskInstances) {
-            String taskVarPool = taskInstance.getVarPool();
-            if (StringUtils.isNotEmpty(taskVarPool)) {
-                List<Property> properties = JSONUtils.toList(taskVarPool, 
Property.class);
-                List<String> keys = properties.stream()
-                        .filter(property -> 
property.getDirect().equals(Direct.OUT))
-                        .map(property -> String.format("%s_%s", 
property.getProp(), property.getType()))
-                        .collect(Collectors.toList());
-                removeSet.addAll(keys);
-            }
-        }
-
-        // remove varPool data and update process instance
-        // TODO: we can remove this snippet if : we get varPool from pre 
taskInstance instead of process instance when
-        // task can not get pre task from incomplete dag
-        List<Property> processProperties = 
JSONUtils.toList(workflowInstance.getVarPool(), Property.class);
-        processProperties = processProperties.stream()
-                .filter(property -> !(property.getDirect().equals(Direct.IN)
-                        && removeSet.contains(String.format("%s_%s", 
property.getProp(), property.getType()))))
-                .collect(Collectors.toList());
-
-        workflowInstance.setVarPool(JSONUtils.toJsonString(processProperties));
+        workflowInstance.setVarPool(
+                VarPoolUtils.subtractVarPoolJson(workflowInstance.getVarPool(),
+                        
removeTaskInstances.stream().map(TaskInstance::getVarPool).collect(Collectors.toList())));
         processInstanceDao.updateById(workflowInstance);
 
         // remove task instance from taskInstanceMap,taskCodeInstanceMap , 
completeTaskSet, validTaskMap, errorTaskMap
@@ -2154,25 +2035,4 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         }
     }
 
-    private void mergeTaskInstanceVarPool(TaskInstance taskInstance) {
-        String taskVarPoolJson = taskInstance.getVarPool();
-        if (StringUtils.isEmpty(taskVarPoolJson)) {
-            return;
-        }
-        ProcessInstance workflowInstance = 
workflowExecuteContext.getWorkflowInstance();
-        String processVarPoolJson = workflowInstance.getVarPool();
-        if (StringUtils.isEmpty(processVarPoolJson)) {
-            workflowInstance.setVarPool(taskVarPoolJson);
-            return;
-        }
-        List<Property> processVarPool = new 
ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class));
-        List<Property> taskVarPool = JSONUtils.toList(taskVarPoolJson, 
Property.class);
-        Set<String> newProcessVarPoolKeys = 
taskVarPool.stream().map(Property::getProp).collect(Collectors.toSet());
-        processVarPool = processVarPool.stream().filter(property -> 
!newProcessVarPoolKeys.contains(property.getProp()))
-                .collect(Collectors.toList());
-
-        processVarPool.addAll(taskVarPool);
-
-        workflowInstance.setVarPool(JSONUtils.toJsonString(processVarPool));
-    }
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index c08fb206f4..409f1b7691 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -71,6 +71,7 @@ import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 import org.springframework.context.ApplicationContext;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 @ExtendWith(MockitoExtension.class)
@@ -106,6 +107,8 @@ public class WorkflowExecuteRunnableTest {
 
     private TaskGroupCoordinator taskGroupCoordinator;
 
+    private WorkflowExecuteContext workflowExecuteContext;
+
     @BeforeEach
     public void init() throws Exception {
         applicationContext = Mockito.mock(ApplicationContext.class);
@@ -134,7 +137,7 @@ public class WorkflowExecuteRunnableTest {
         stateWheelExecuteThread = Mockito.mock(StateWheelExecuteThread.class);
         curingGlobalParamsService = Mockito.mock(CuringParamsService.class);
         ProcessAlertManager processAlertManager = 
Mockito.mock(ProcessAlertManager.class);
-        WorkflowExecuteContext workflowExecuteContext = 
Mockito.mock(WorkflowExecuteContext.class);
+        workflowExecuteContext = Mockito.mock(WorkflowExecuteContext.class);
         
Mockito.when(workflowExecuteContext.getWorkflowInstance()).thenReturn(processInstance);
         IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class);
         
Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph);
@@ -209,11 +212,13 @@ public class WorkflowExecuteRunnableTest {
     }
 
     @Test
-    public void testGetPreVarPool() {
+    public void testInitializeTaskInstanceVarPool() {
         try {
-            Set<Long> preTaskName = new HashSet<>();
-            preTaskName.add(1L);
-            preTaskName.add(2L);
+            IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class);
+            
Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph);
+            TaskNode taskNode = Mockito.mock(TaskNode.class);
+            
Mockito.when(workflowGraph.getTaskNodeByCode(Mockito.anyLong())).thenReturn(taskNode);
+            
Mockito.when(taskNode.getPreTasks()).thenReturn(JSONUtils.toJsonString(Lists.newArrayList(1L,
 2L)));
 
             TaskInstance taskInstance = new TaskInstance();
 
@@ -255,7 +260,7 @@ public class WorkflowExecuteRunnableTest {
             taskCodeInstanceMapField.setAccessible(true);
             taskCodeInstanceMapField.set(workflowExecuteThread, 
taskCodeInstanceMap);
 
-            workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
+            workflowExecuteThread.initializeTaskInstanceVarPool(taskInstance);
             Assertions.assertNotNull(taskInstance.getVarPool());
 
             
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
@@ -266,7 +271,7 @@ public class WorkflowExecuteRunnableTest {
             taskInstanceMapField.setAccessible(true);
             taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
 
-            workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
+            workflowExecuteThread.initializeTaskInstanceVarPool(taskInstance);
             Assertions.assertNotNull(taskInstance.getVarPool());
         } catch (Exception e) {
             Assertions.fail();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
index bac4e651df..f2da8ac40a 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
@@ -23,6 +23,9 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import java.io.Serializable;
 import java.util.Objects;
 
+import lombok.Data;
+
+@Data
 public class Property implements Serializable {
 
     private static final long serialVersionUID = -4045513703397452451L;
@@ -56,62 +59,6 @@ public class Property implements Serializable {
         this.value = value;
     }
 
-    /**
-     * getter method
-     *
-     * @return the prop
-     * @see Property#prop
-     */
-    public String getProp() {
-        return prop;
-    }
-
-    /**
-     * setter method
-     *
-     * @param prop the prop to set
-     * @see Property#prop
-     */
-    public void setProp(String prop) {
-        this.prop = prop;
-    }
-
-    /**
-     * getter method
-     *
-     * @return the value
-     * @see Property#value
-     */
-    public String getValue() {
-        return value;
-    }
-
-    /**
-     * setter method
-     *
-     * @param value the value to set
-     * @see Property#value
-     */
-    public void setValue(String value) {
-        this.value = value;
-    }
-
-    public Direct getDirect() {
-        return direct;
-    }
-
-    public void setDirect(Direct direct) {
-        this.direct = direct;
-    }
-
-    public DataType getType() {
-        return type;
-    }
-
-    public void setType(DataType type) {
-        this.type = type;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
index f11a83bc54..f99578d743 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
@@ -25,6 +25,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -35,6 +36,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -42,6 +44,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.Lists;
 
 @Getter
 @Slf4j
@@ -82,6 +85,7 @@ public abstract class AbstractParameters implements 
IParameters {
 
     /**
      * get input local parameters map if the param direct is IN
+     *
      * @return parameters map
      */
     public Map<String, Property> getInputLocalParametersMap() {
@@ -121,44 +125,30 @@ public abstract class AbstractParameters implements 
IParameters {
     }
 
     public void dealOutParam(Map<String, String> taskOutputParams) {
-        if (CollectionUtils.isEmpty(localParams)) {
-            return;
-        }
         List<Property> outProperty = getOutProperty(localParams);
         if (CollectionUtils.isEmpty(outProperty)) {
             return;
         }
-        if (MapUtils.isEmpty(taskOutputParams)) {
-            outProperty.forEach(this::addPropertyToValPool);
-            return;
-        }
-
-        for (Property info : outProperty) {
-            String propValue = taskOutputParams.get(info.getProp());
-            if (StringUtils.isNotEmpty(propValue)) {
-                info.setValue(propValue);
-                addPropertyToValPool(info);
-                continue;
-            }
-            addPropertyToValPool(info);
-            if (StringUtils.isEmpty(info.getValue())) {
-                log.warn("The output parameter {} value is empty and cannot 
find the out parameter from task output",
-                        info);
+        if (CollectionUtils.isNotEmpty(outProperty) && 
MapUtils.isNotEmpty(taskOutputParams)) {
+            // Inject the value
+            for (Property info : outProperty) {
+                String value = taskOutputParams.get(info.getProp());
+                if (value != null) {
+                    info.setValue(value);
+                }
             }
         }
+
+        varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, 
outProperty));
     }
 
-    public List<Property> getOutProperty(List<Property> params) {
+    protected List<Property> getOutProperty(List<Property> params) {
         if (CollectionUtils.isEmpty(params)) {
             return new ArrayList<>();
         }
-        List<Property> result = new ArrayList<>();
-        for (Property info : params) {
-            if (info.getDirect() == Direct.OUT) {
-                result.add(info);
-            }
-        }
-        return result;
+        return params.stream()
+                .filter(info -> info.getDirect() == Direct.OUT)
+                .collect(Collectors.toList());
     }
 
     public List<Map<String, String>> getListMapByString(String json) {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
index 0f1a893a30..75ebe6c9cd 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
@@ -27,6 +27,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -40,6 +41,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 
 /**
  * Sql/Hql parameter
@@ -245,7 +247,7 @@ public class SqlParameters extends AbstractParameters {
             return;
         }
         if (StringUtils.isEmpty(result)) {
-            varPool.addAll(outProperty);
+            varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, 
outProperty));
             return;
         }
         List<Map<String, String>> sqlResult = getListMapByString(result);
@@ -268,7 +270,6 @@ public class SqlParameters extends AbstractParameters {
             for (Property info : outProperty) {
                 if (info.getType() == DataType.LIST) {
                     
info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
-                    varPool.add(info);
                 }
             }
         } else {
@@ -276,9 +277,9 @@ public class SqlParameters extends AbstractParameters {
             Map<String, String> firstRow = sqlResult.get(0);
             for (Property info : outProperty) {
                 info.setValue(String.valueOf(firstRow.get(info.getProp())));
-                varPool.add(info);
             }
         }
+        varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, 
outProperty));
 
     }
 
@@ -322,6 +323,7 @@ public class SqlParameters extends AbstractParameters {
 
     /**
      * TODO SQLTaskExecutionContext needs to be optimized
+     *
      * @param parametersHelper
      * @return
      */
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
new file mode 100644
index 0000000000..7c24eb9a21
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@UtilityClass
+public class VarPoolUtils {
+
+    public List<Property> deserializeVarPool(String varPoolJson) {
+        return JSONUtils.toList(varPoolJson, Property.class);
+    }
+
+    /**
+     * @see #mergeVarPool(List)
+     */
+    public String mergeVarPoolJsonString(List<String> varPoolJsons) {
+        if (CollectionUtils.isEmpty(varPoolJsons)) {
+            return null;
+        }
+        List<List<Property>> varPools = varPoolJsons.stream()
+                .map(VarPoolUtils::deserializeVarPool)
+                .collect(Collectors.toList());
+        List<Property> finalVarPool = mergeVarPool(varPools);
+        return JSONUtils.toJsonString(finalVarPool);
+    }
+
+    /**
+     * Merge the given two varpools, and return the merged varpool.
+     * If the two varpools have the same property({@link Property#getProp()} 
and {@link Property#getDirect()} is same), the value of the property in 
varpool2 will be used.
+     * // todo: we may need to consider the datatype of the property
+     */
+    public List<Property> mergeVarPool(List<List<Property>> varPools) {
+        if (CollectionUtils.isEmpty(varPools)) {
+            return null;
+        }
+        if (varPools.size() == 1) {
+            return varPools.get(0);
+        }
+        Map<String, Property> result = new HashMap<>();
+        for (List<Property> varPool : varPools) {
+            if (CollectionUtils.isEmpty(varPool)) {
+                continue;
+            }
+            for (Property property : varPool) {
+                if (!Direct.OUT.equals(property.getDirect())) {
+                    log.info("The direct should be OUT in varPool, but got 
{}", property.getDirect());
+                    continue;
+                }
+                result.put(property.getProp(), property);
+            }
+        }
+        return new ArrayList<>(result.values());
+    }
+
+    public String subtractVarPoolJson(String varPool, List<String> 
subtractVarPool) {
+        List<Property> varPoolList = deserializeVarPool(varPool);
+        List<List<Property>> subtractVarPoolList = subtractVarPool.stream()
+                .map(VarPoolUtils::deserializeVarPool)
+                .collect(Collectors.toList());
+        List<Property> finalVarPool = subtractVarPool(varPoolList, 
subtractVarPoolList);
+        return JSONUtils.toJsonString(finalVarPool);
+    }
+
+    /**
+     * Return the subtracted varpool, which key is in varPool but not in 
subtractVarPool.
+     */
+    public List<Property> subtractVarPool(List<Property> varPool, 
List<List<Property>> subtractVarPool) {
+        if (CollectionUtils.isEmpty(varPool)) {
+            return null;
+        }
+        if (CollectionUtils.isEmpty(subtractVarPool)) {
+            return varPool;
+        }
+        Map<String, Property> subtractVarPoolMap = new HashMap<>();
+        for (List<Property> properties : subtractVarPool) {
+            for (Property property : properties) {
+                subtractVarPoolMap.put(property.getProp(), property);
+            }
+        }
+        List<Property> result = new ArrayList<>();
+        for (Property property : varPool) {
+            if (!subtractVarPoolMap.containsKey(property.getProp())) {
+                result.add(property);
+            }
+        }
+        return result;
+    }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
new file mode 100644
index 0000000000..231d97029f
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.truth.Truth;
+
+class VarPoolUtilsTest {
+
+    @Test
+    void mergeVarPool() {
+        Truth.assertThat(VarPoolUtils.mergeVarPool(null)).isNull();
+
+        // Override the value of the same property
+        // Merge the property with different key.
+        List<Property> varpool1 = Lists.newArrayList(new Property("name", 
Direct.OUT, DataType.VARCHAR, "tom"));
+        List<Property> varpool2 = Lists.newArrayList(
+                new Property("name", Direct.OUT, DataType.VARCHAR, "tim"),
+                new Property("age", Direct.OUT, DataType.INTEGER, "10"));
+
+        
Truth.assertThat(VarPoolUtils.mergeVarPool(Lists.newArrayList(varpool1, 
varpool2)))
+                .containsExactly(
+                        new Property("name", Direct.OUT, DataType.VARCHAR, 
"tim"),
+                        new Property("age", Direct.OUT, DataType.INTEGER, 
"10"));
+
+    }
+
+    @Test
+    void subtractVarPool() {
+        Truth.assertThat(VarPoolUtils.subtractVarPool(null, null)).isNull();
+        List<Property> varpool1 = Lists.newArrayList(new Property("name", 
Direct.OUT, DataType.VARCHAR, "tom"),
+                new Property("age", Direct.OUT, DataType.INTEGER, "10"));
+        List<Property> varpool2 = Lists.newArrayList(new Property("name", 
Direct.OUT, DataType.VARCHAR, "tom"));
+        List<Property> varpool3 = Lists.newArrayList(new Property("location", 
Direct.OUT, DataType.VARCHAR, "china"));
+
+        Truth.assertThat(VarPoolUtils.subtractVarPool(varpool1, 
Lists.newArrayList(varpool2, varpool3)))
+                .containsExactly(new Property("age", Direct.OUT, 
DataType.INTEGER, "10"));
+    }
+}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 8dd842a8ed..797d8f78a3 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -25,6 +25,7 @@ import 
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
 import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
 import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
@@ -86,6 +87,7 @@ public class WorkerServer implements IStoppable {
     public void run() {
         this.workerRpcServer.start();
         TaskPluginManager.loadPlugin();
+        DataSourceProcessorProvider.initialize();
 
         this.workerRegistryClient.setRegistryStoppable(this);
         this.workerRegistryClient.start();
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
index 41cef62028..e64a421001 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
@@ -283,6 +283,7 @@ public abstract class WorkerTaskExecutor implements 
Runnable {
 
         // upload out files and modify the "OUT FILE" property in VarPool
         TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, 
storageOperate);
+
         log.info("Upload output files: {} successfully",
                 
TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT));
 
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
index 87438c614b..f060c0a17d 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
@@ -70,17 +70,18 @@ public class TaskFilesTransferUtils {
      */
     public static void uploadOutputFiles(TaskExecutionContext 
taskExecutionContext,
                                          StorageOperate storageOperate) throws 
TaskException {
-        List<Property> varPools = getVarPools(taskExecutionContext);
-        // get map of varPools for quick search
-        Map<String, Property> varPoolsMap = 
varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
-
         // get OUTPUT FILE parameters
         List<Property> localParamsProperty = 
getFileLocalParams(taskExecutionContext, Direct.OUT);
-
         if (localParamsProperty.isEmpty()) {
             return;
         }
 
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream()
+                .filter(property -> Direct.OUT.equals(property.getDirect()))
+                .collect(Collectors.toMap(Property::getProp, x -> x));
+
         log.info("Upload output files ...");
         for (Property property : localParamsProperty) {
             // get local file path
@@ -137,10 +138,6 @@ public class TaskFilesTransferUtils {
      * @throws TaskException task exception
      */
     public static void downloadUpstreamFiles(TaskExecutionContext 
taskExecutionContext, StorageOperate storageOperate) {
-        List<Property> varPools = getVarPools(taskExecutionContext);
-        // get map of varPools for quick search
-        Map<String, Property> varPoolsMap = 
varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
-
         // get "IN FILE" parameters
         List<Property> localParamsProperty = 
getFileLocalParams(taskExecutionContext, Direct.IN);
 
@@ -148,6 +145,13 @@ public class TaskFilesTransferUtils {
             return;
         }
 
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools
+                .stream()
+                .filter(property -> Direct.IN.equals(property.getDirect()))
+                .collect(Collectors.toMap(Property::getProp, x -> x));
+
         String executePath = taskExecutionContext.getExecutePath();
         // data path to download packaged data
         String downloadTmpPath = String.format("%s/%s", executePath, 
DOWNLOAD_TMP);

Reply via email to