This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 61915a2d5d Fix workflow instance restart failed due to duplicate key
in varpool (#16001)
61915a2d5d is described below
commit 61915a2d5dfbb0470d536d8954529029e0fbf120
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu May 16 11:32:27 2024 +0800
Fix workflow instance restart failed due to duplicate key in varpool
(#16001)
---
.../dolphinscheduler/api/ApiApplicationServer.java | 2 +
.../api/service/impl/TaskInstanceServiceImpl.java | 13 +-
.../api/plugin/DataSourceProcessorProvider.java | 4 +
.../server/master/MasterServer.java | 2 +
.../master/runner/WorkflowExecuteRunnable.java | 200 ++++-----------------
.../master/runner/WorkflowExecuteRunnableTest.java | 19 +-
.../plugin/task/api/model/Property.java | 59 +-----
.../task/api/parameters/AbstractParameters.java | 44 ++---
.../plugin/task/api/parameters/SqlParameters.java | 8 +-
.../plugin/task/api/utils/VarPoolUtils.java | 119 ++++++++++++
.../plugin/task/api/utils/VarPoolUtilsTest.java | 62 +++++++
.../server/worker/WorkerServer.java | 2 +
.../server/worker/runner/WorkerTaskExecutor.java | 1 +
.../worker/utils/TaskFilesTransferUtils.java | 22 ++-
14 files changed, 281 insertions(+), 276 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 6f7d8f43d2..f8b705179a 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -24,6 +24,7 @@ import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
@@ -69,6 +70,7 @@ public class ApiApplicationServer {
log.info("Received spring application context ready event will load
taskPlugin and write to DB");
// install task plugin
TaskPluginManager.loadPlugin();
+ DataSourceProcessorProvider.initialize();
for (Map.Entry<String, TaskChannelFactory> entry :
TaskPluginManager.getTaskChannelFactoryMap().entrySet()) {
String taskPluginName = entry.getKey();
TaskChannelFactory taskChannelFactory = entry.getValue();
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 7469d8db13..49e42da561 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -367,10 +367,15 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
}
for (TaskInstance taskInstance : needToDeleteTaskInstances) {
if (StringUtils.isNotBlank(taskInstance.getLogPath())) {
- ILogService iLogService =
-
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
- ILogService.class);
- iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
+ try {
+ // Remove task instance log failed will not affect the
deletion of task instance
+ ILogService iLogService =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
+ ILogService.class);
+
iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
+ } catch (Exception ex) {
+ log.error("Remove task instance log error", ex);
+ }
}
}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
index 751ac1ba08..973421615f 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
@@ -37,6 +37,10 @@ public class DataSourceProcessorProvider {
private DataSourceProcessorProvider() {
}
+ public static void initialize() {
+ log.info("Initialize DataSourceProcessorProvider");
+ }
+
public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType
dbType) {
return
dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name());
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 7988570135..9b507500b9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@@ -108,6 +109,7 @@ public class MasterServer implements IStoppable {
// install task plugin
TaskPluginManager.loadPlugin();
+ DataSourceProcessorProvider.initialize();
this.masterSlotManager.start();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 725b7e000a..72c52922d7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -57,11 +57,10 @@ import
org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
@@ -97,6 +96,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -376,7 +376,8 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
if (taskInstance.getState().isSuccess()) {
completeTaskSet.add(taskInstance.getTaskCode());
- mergeTaskInstanceVarPool(taskInstance);
+
workflowInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(
+ Lists.newArrayList(workflowInstance.getVarPool(),
taskInstance.getVarPool())));
processInstanceDao.upsertProcessInstance(workflowInstance);
ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
@@ -441,7 +442,6 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
/**
* crate new task instance to retry, different objects from the original
- *
*/
private void retryTaskInstance(TaskInstance taskInstance) throws
StateEventHandleException {
ProcessInstance workflowInstance =
workflowExecuteContext.getWorkflowInstance();
@@ -532,16 +532,6 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
}
}
- /**
- * check if task instance exist by id
- */
- public boolean checkTaskInstanceById(int taskInstanceId) {
- if (taskInstanceMap.isEmpty()) {
- return false;
- }
- return taskInstanceMap.containsKey(taskInstanceId);
- }
-
/**
* get task instance from memory
*/
@@ -1070,7 +1060,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
* new a taskInstance
*
* @param processInstance process instance
- * @param taskNode task node
+ * @param taskNode task node
* @return task instance
*/
public TaskInstance newTaskInstance(ProcessInstance processInstance,
TaskNode taskNode) {
@@ -1161,80 +1151,32 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
return taskInstance;
}
- public void getPreVarPool(TaskInstance taskInstance, Set<Long> preTask) {
+ void initializeTaskInstanceVarPool(TaskInstance taskInstance) {
+ // get pre task ,get all the task varPool to this task
+ // Do not use dag.getPreviousNodes because of the dag may be miss the
upstream node
+ String preTasks =
+
workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode()).getPreTasks();
+ Set<Long> preTaskList = new HashSet<>(JSONUtils.toList(preTasks,
Long.class));
ProcessInstance workflowInstance =
workflowExecuteContext.getWorkflowInstance();
- Map<String, Property> allProperty = new HashMap<>();
- Map<String, TaskInstance> allTaskInstance = new HashMap<>();
- if (CollectionUtils.isNotEmpty(preTask)) {
- for (Long preTaskCode : preTask) {
- Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(preTaskCode);
- if (!existTaskInstanceOptional.isPresent()) {
- continue;
- }
- Integer taskId = existTaskInstanceOptional.get().getId();
- if (taskId == null) {
- continue;
- }
- TaskInstance preTaskInstance = taskInstanceMap.get(taskId);
- if (preTaskInstance == null) {
- continue;
- }
- String preVarPool = preTaskInstance.getVarPool();
- if (StringUtils.isNotEmpty(preVarPool)) {
- List<Property> properties = JSONUtils.toList(preVarPool,
Property.class);
- for (Property info : properties) {
- setVarPoolValue(allProperty, allTaskInstance,
preTaskInstance, info);
- }
- }
- }
- if (allProperty.size() > 0) {
-
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
- }
- } else {
- if (StringUtils.isNotEmpty(workflowInstance.getVarPool())) {
- taskInstance.setVarPool(workflowInstance.getVarPool());
- }
+ if (CollectionUtils.isEmpty(preTaskList)) {
+ taskInstance.setVarPool(workflowInstance.getVarPool());
+ return;
}
+ List<String> preTaskInstanceVarPools = preTaskList
+ .stream()
+ .map(taskCode -> getTaskInstance(taskCode).orElse(null))
+ .filter(Objects::nonNull)
+ .sorted(Comparator.comparing(TaskInstance::getEndTime))
+ .map(TaskInstance::getVarPool)
+ .collect(Collectors.toList());
+
taskInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(preTaskInstanceVarPools));
}
public Collection<TaskInstance> getAllTaskInstances() {
return taskInstanceMap.values();
}
- private void setVarPoolValue(Map<String, Property> allProperty,
- Map<String, TaskInstance> allTaskInstance,
- TaskInstance preTaskInstance, Property
thisProperty) {
- // for this taskInstance all the param in this part is IN.
- thisProperty.setDirect(Direct.IN);
- // get the pre taskInstance Property's name
- String proName = thisProperty.getProp();
- // if the Previous nodes have the Property of same name
- if (allProperty.containsKey(proName)) {
- // comparison the value of two Property
- Property otherPro = allProperty.get(proName);
- // if this property'value of loop is empty,use the other,whether
the other's value is empty or not
- if (StringUtils.isEmpty(thisProperty.getValue())) {
- allProperty.put(proName, otherPro);
- // if property'value of loop is not empty,and the other's
value is not empty too, use the earlier value
- } else if (StringUtils.isNotEmpty(otherPro.getValue())) {
- TaskInstance otherTask = allTaskInstance.get(proName);
- if (otherTask.getEndTime().getTime() >
preTaskInstance.getEndTime().getTime()) {
- allProperty.put(proName, thisProperty);
- allTaskInstance.put(proName, preTaskInstance);
- } else {
- allProperty.put(proName, otherPro);
- }
- } else {
- allProperty.put(proName, thisProperty);
- allTaskInstance.put(proName, preTaskInstance);
- }
- } else {
- allProperty.put(proName, thisProperty);
- allTaskInstance.put(proName, preTaskInstance);
- }
- }
-
/**
* get complete task instance map, taskCode as key
*/
@@ -1311,45 +1253,10 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
}
// the end node of the branch of the dag
if (parentNodeCode != null &&
dag.getEndNode().contains(parentNodeCode)) {
- Optional<TaskInstance> existTaskInstanceOptional =
getTaskInstance(parentNodeCode);
- if (existTaskInstanceOptional.isPresent()) {
- TaskInstance endTaskInstance =
taskInstanceMap.get(existTaskInstanceOptional.get().getId());
- String taskInstanceVarPool = endTaskInstance.getVarPool();
- if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
- Set<Property> taskProperties = new
HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
- String processInstanceVarPool =
workflowInstance.getVarPool();
- List<Property> processGlobalParams =
- new
ArrayList<>(JSONUtils.toList(workflowInstance.getGlobalParams(),
Property.class));
- Map<String, Direct> oldProcessGlobalParamsMap =
processGlobalParams.stream()
- .collect(Collectors.toMap(Property::getProp,
Property::getDirect));
- Set<Property> processVarPoolOut = taskProperties.stream()
- .filter(property ->
property.getDirect().equals(Direct.OUT)
- &&
oldProcessGlobalParamsMap.containsKey(property.getProp())
- &&
oldProcessGlobalParamsMap.get(property.getProp()).equals(Direct.OUT))
- .collect(Collectors.toSet());
- Set<Property> taskVarPoolIn =
- taskProperties.stream().filter(property ->
property.getDirect().equals(Direct.IN))
- .collect(Collectors.toSet());
- if (StringUtils.isNotEmpty(processInstanceVarPool)) {
- Set<Property> properties =
- new
HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
- Set<String> newProcessVarPoolKeys =
-
taskProperties.stream().map(Property::getProp).collect(Collectors.toSet());
- properties = properties.stream()
- .filter(property ->
!newProcessVarPoolKeys.contains(property.getProp()))
- .collect(Collectors.toSet());
- properties.addAll(processVarPoolOut);
- properties.addAll(taskVarPoolIn);
-
-
workflowInstance.setVarPool(JSONUtils.toJsonString(properties));
- } else {
- Set<Property> varPool = new HashSet<>();
- varPool.addAll(taskVarPoolIn);
- varPool.addAll(processVarPoolOut);
-
workflowInstance.setVarPool(JSONUtils.toJsonString(varPool));
- }
- }
- }
+ getTaskInstance(parentNodeCode)
+ .ifPresent(endTaskInstance ->
workflowInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(
+ Lists.newArrayList(workflowInstance.getVarPool(),
endTaskInstance.getVarPool()))));
+
}
// if previous node success , post node submit
@@ -1907,14 +1814,8 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
continue;
}
}
- // init varPool only this task is the first time running
if (task.isFirstRun()) {
- // get pre task ,get all the task varPool to this task
- // Do not use dag.getPreviousNodes because of the dag may be
miss the upstream node
- String preTasks = workflowExecuteContext.getWorkflowGraph()
- .getTaskNodeByCode(task.getTaskCode()).getPreTasks();
- Set<Long> preTaskList = new
HashSet<>(JSONUtils.toList(preTasks, Long.class));
- getPreVarPool(task, preTaskList);
+ initializeTaskInstanceVarPool(task);
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
@@ -2095,29 +1996,9 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
taskInstanceDao.updateById(taskInstance);
}
- Set<String> removeSet = new HashSet<>();
- for (TaskInstance taskInstance : removeTaskInstances) {
- String taskVarPool = taskInstance.getVarPool();
- if (StringUtils.isNotEmpty(taskVarPool)) {
- List<Property> properties = JSONUtils.toList(taskVarPool,
Property.class);
- List<String> keys = properties.stream()
- .filter(property ->
property.getDirect().equals(Direct.OUT))
- .map(property -> String.format("%s_%s",
property.getProp(), property.getType()))
- .collect(Collectors.toList());
- removeSet.addAll(keys);
- }
- }
-
- // remove varPool data and update process instance
- // TODO: we can remove this snippet if : we get varPool from pre
taskInstance instead of process instance when
- // task can not get pre task from incomplete dag
- List<Property> processProperties =
JSONUtils.toList(workflowInstance.getVarPool(), Property.class);
- processProperties = processProperties.stream()
- .filter(property -> !(property.getDirect().equals(Direct.IN)
- && removeSet.contains(String.format("%s_%s",
property.getProp(), property.getType()))))
- .collect(Collectors.toList());
-
- workflowInstance.setVarPool(JSONUtils.toJsonString(processProperties));
+ workflowInstance.setVarPool(
+ VarPoolUtils.subtractVarPoolJson(workflowInstance.getVarPool(),
+
removeTaskInstances.stream().map(TaskInstance::getVarPool).collect(Collectors.toList())));
processInstanceDao.updateById(workflowInstance);
// remove task instance from taskInstanceMap,taskCodeInstanceMap ,
completeTaskSet, validTaskMap, errorTaskMap
@@ -2154,25 +2035,4 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
}
}
- private void mergeTaskInstanceVarPool(TaskInstance taskInstance) {
- String taskVarPoolJson = taskInstance.getVarPool();
- if (StringUtils.isEmpty(taskVarPoolJson)) {
- return;
- }
- ProcessInstance workflowInstance =
workflowExecuteContext.getWorkflowInstance();
- String processVarPoolJson = workflowInstance.getVarPool();
- if (StringUtils.isEmpty(processVarPoolJson)) {
- workflowInstance.setVarPool(taskVarPoolJson);
- return;
- }
- List<Property> processVarPool = new
ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class));
- List<Property> taskVarPool = JSONUtils.toList(taskVarPoolJson,
Property.class);
- Set<String> newProcessVarPoolKeys =
taskVarPool.stream().map(Property::getProp).collect(Collectors.toSet());
- processVarPool = processVarPool.stream().filter(property ->
!newProcessVarPoolKeys.contains(property.getProp()))
- .collect(Collectors.toList());
-
- processVarPool.addAll(taskVarPool);
-
- workflowInstance.setVarPool(JSONUtils.toJsonString(processVarPool));
- }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index c08fb206f4..409f1b7691 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -71,6 +71,7 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.context.ApplicationContext;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ExtendWith(MockitoExtension.class)
@@ -106,6 +107,8 @@ public class WorkflowExecuteRunnableTest {
private TaskGroupCoordinator taskGroupCoordinator;
+ private WorkflowExecuteContext workflowExecuteContext;
+
@BeforeEach
public void init() throws Exception {
applicationContext = Mockito.mock(ApplicationContext.class);
@@ -134,7 +137,7 @@ public class WorkflowExecuteRunnableTest {
stateWheelExecuteThread = Mockito.mock(StateWheelExecuteThread.class);
curingGlobalParamsService = Mockito.mock(CuringParamsService.class);
ProcessAlertManager processAlertManager =
Mockito.mock(ProcessAlertManager.class);
- WorkflowExecuteContext workflowExecuteContext =
Mockito.mock(WorkflowExecuteContext.class);
+ workflowExecuteContext = Mockito.mock(WorkflowExecuteContext.class);
Mockito.when(workflowExecuteContext.getWorkflowInstance()).thenReturn(processInstance);
IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class);
Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph);
@@ -209,11 +212,13 @@ public class WorkflowExecuteRunnableTest {
}
@Test
- public void testGetPreVarPool() {
+ public void testInitializeTaskInstanceVarPool() {
try {
- Set<Long> preTaskName = new HashSet<>();
- preTaskName.add(1L);
- preTaskName.add(2L);
+ IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class);
+
Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph);
+ TaskNode taskNode = Mockito.mock(TaskNode.class);
+
Mockito.when(workflowGraph.getTaskNodeByCode(Mockito.anyLong())).thenReturn(taskNode);
+
Mockito.when(taskNode.getPreTasks()).thenReturn(JSONUtils.toJsonString(Lists.newArrayList(1L,
2L)));
TaskInstance taskInstance = new TaskInstance();
@@ -255,7 +260,7 @@ public class WorkflowExecuteRunnableTest {
taskCodeInstanceMapField.setAccessible(true);
taskCodeInstanceMapField.set(workflowExecuteThread,
taskCodeInstanceMap);
- workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
+ workflowExecuteThread.initializeTaskInstanceVarPool(taskInstance);
Assertions.assertNotNull(taskInstance.getVarPool());
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
@@ -266,7 +271,7 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
- workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
+ workflowExecuteThread.initializeTaskInstanceVarPool(taskInstance);
Assertions.assertNotNull(taskInstance.getVarPool());
} catch (Exception e) {
Assertions.fail();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
index bac4e651df..f2da8ac40a 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
@@ -23,6 +23,9 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import java.io.Serializable;
import java.util.Objects;
+import lombok.Data;
+
+@Data
public class Property implements Serializable {
private static final long serialVersionUID = -4045513703397452451L;
@@ -56,62 +59,6 @@ public class Property implements Serializable {
this.value = value;
}
- /**
- * getter method
- *
- * @return the prop
- * @see Property#prop
- */
- public String getProp() {
- return prop;
- }
-
- /**
- * setter method
- *
- * @param prop the prop to set
- * @see Property#prop
- */
- public void setProp(String prop) {
- this.prop = prop;
- }
-
- /**
- * getter method
- *
- * @return the value
- * @see Property#value
- */
- public String getValue() {
- return value;
- }
-
- /**
- * setter method
- *
- * @param value the value to set
- * @see Property#value
- */
- public void setValue(String value) {
- this.value = value;
- }
-
- public Direct getDirect() {
- return direct;
- }
-
- public void setDirect(Direct direct) {
- this.direct = direct;
- }
-
- public DataType getType() {
- return type;
- }
-
- public void setType(DataType type) {
- this.type = type;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
index f11a83bc54..f99578d743 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
@@ -25,6 +25,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -35,6 +36,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
@@ -42,6 +44,7 @@ import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.Lists;
@Getter
@Slf4j
@@ -82,6 +85,7 @@ public abstract class AbstractParameters implements
IParameters {
/**
* get input local parameters map if the param direct is IN
+ *
* @return parameters map
*/
public Map<String, Property> getInputLocalParametersMap() {
@@ -121,44 +125,30 @@ public abstract class AbstractParameters implements
IParameters {
}
public void dealOutParam(Map<String, String> taskOutputParams) {
- if (CollectionUtils.isEmpty(localParams)) {
- return;
- }
List<Property> outProperty = getOutProperty(localParams);
if (CollectionUtils.isEmpty(outProperty)) {
return;
}
- if (MapUtils.isEmpty(taskOutputParams)) {
- outProperty.forEach(this::addPropertyToValPool);
- return;
- }
-
- for (Property info : outProperty) {
- String propValue = taskOutputParams.get(info.getProp());
- if (StringUtils.isNotEmpty(propValue)) {
- info.setValue(propValue);
- addPropertyToValPool(info);
- continue;
- }
- addPropertyToValPool(info);
- if (StringUtils.isEmpty(info.getValue())) {
- log.warn("The output parameter {} value is empty and cannot
find the out parameter from task output",
- info);
+ if (CollectionUtils.isNotEmpty(outProperty) &&
MapUtils.isNotEmpty(taskOutputParams)) {
+ // Inject the value
+ for (Property info : outProperty) {
+ String value = taskOutputParams.get(info.getProp());
+ if (value != null) {
+ info.setValue(value);
+ }
}
}
+
+ varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool,
outProperty));
}
- public List<Property> getOutProperty(List<Property> params) {
+ protected List<Property> getOutProperty(List<Property> params) {
if (CollectionUtils.isEmpty(params)) {
return new ArrayList<>();
}
- List<Property> result = new ArrayList<>();
- for (Property info : params) {
- if (info.getDirect() == Direct.OUT) {
- result.add(info);
- }
- }
- return result;
+ return params.stream()
+ .filter(info -> info.getDirect() == Direct.OUT)
+ .collect(Collectors.toList());
}
public List<Map<String, String>> getListMapByString(String json) {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
index 0f1a893a30..75ebe6c9cd 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
@@ -27,6 +27,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -40,6 +41,7 @@ import java.util.stream.Collectors;
import com.google.common.base.Enums;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
/**
* Sql/Hql parameter
@@ -245,7 +247,7 @@ public class SqlParameters extends AbstractParameters {
return;
}
if (StringUtils.isEmpty(result)) {
- varPool.addAll(outProperty);
+ varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool,
outProperty));
return;
}
List<Map<String, String>> sqlResult = getListMapByString(result);
@@ -268,7 +270,6 @@ public class SqlParameters extends AbstractParameters {
for (Property info : outProperty) {
if (info.getType() == DataType.LIST) {
info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
- varPool.add(info);
}
}
} else {
@@ -276,9 +277,9 @@ public class SqlParameters extends AbstractParameters {
Map<String, String> firstRow = sqlResult.get(0);
for (Property info : outProperty) {
info.setValue(String.valueOf(firstRow.get(info.getProp())));
- varPool.add(info);
}
}
+ varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool,
outProperty));
}
@@ -322,6 +323,7 @@ public class SqlParameters extends AbstractParameters {
/**
* TODO SQLTaskExecutionContext needs to be optimized
+ *
* @param parametersHelper
* @return
*/
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
new file mode 100644
index 0000000000..7c24eb9a21
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@UtilityClass
+public class VarPoolUtils {
+
+ public List<Property> deserializeVarPool(String varPoolJson) {
+ return JSONUtils.toList(varPoolJson, Property.class);
+ }
+
+ /**
+ * @see #mergeVarPool(List)
+ */
+ public String mergeVarPoolJsonString(List<String> varPoolJsons) {
+ if (CollectionUtils.isEmpty(varPoolJsons)) {
+ return null;
+ }
+ List<List<Property>> varPools = varPoolJsons.stream()
+ .map(VarPoolUtils::deserializeVarPool)
+ .collect(Collectors.toList());
+ List<Property> finalVarPool = mergeVarPool(varPools);
+ return JSONUtils.toJsonString(finalVarPool);
+ }
+
+ /**
+ * Merge the given two varpools, and return the merged varpool.
+ * If the two varpools have the same property({@link Property#getProp()}
and {@link Property#getDirect()} is same), the value of the property in
varpool2 will be used.
+ * // todo: we may need to consider the datatype of the property
+ */
+ public List<Property> mergeVarPool(List<List<Property>> varPools) {
+ if (CollectionUtils.isEmpty(varPools)) {
+ return null;
+ }
+ if (varPools.size() == 1) {
+ return varPools.get(0);
+ }
+ Map<String, Property> result = new HashMap<>();
+ for (List<Property> varPool : varPools) {
+ if (CollectionUtils.isEmpty(varPool)) {
+ continue;
+ }
+ for (Property property : varPool) {
+ if (!Direct.OUT.equals(property.getDirect())) {
+ log.info("The direct should be OUT in varPool, but got
{}", property.getDirect());
+ continue;
+ }
+ result.put(property.getProp(), property);
+ }
+ }
+ return new ArrayList<>(result.values());
+ }
+
+ public String subtractVarPoolJson(String varPool, List<String>
subtractVarPool) {
+ List<Property> varPoolList = deserializeVarPool(varPool);
+ List<List<Property>> subtractVarPoolList = subtractVarPool.stream()
+ .map(VarPoolUtils::deserializeVarPool)
+ .collect(Collectors.toList());
+ List<Property> finalVarPool = subtractVarPool(varPoolList,
subtractVarPoolList);
+ return JSONUtils.toJsonString(finalVarPool);
+ }
+
+ /**
+ * Return the subtracted varpool, which key is in varPool but not in
subtractVarPool.
+ */
+ public List<Property> subtractVarPool(List<Property> varPool,
List<List<Property>> subtractVarPool) {
+ if (CollectionUtils.isEmpty(varPool)) {
+ return null;
+ }
+ if (CollectionUtils.isEmpty(subtractVarPool)) {
+ return varPool;
+ }
+ Map<String, Property> subtractVarPoolMap = new HashMap<>();
+ for (List<Property> properties : subtractVarPool) {
+ for (Property property : properties) {
+ subtractVarPoolMap.put(property.getProp(), property);
+ }
+ }
+ List<Property> result = new ArrayList<>();
+ for (Property property : varPool) {
+ if (!subtractVarPoolMap.containsKey(property.getProp())) {
+ result.add(property);
+ }
+ }
+ return result;
+ }
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
new file mode 100644
index 0000000000..231d97029f
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.truth.Truth;
+
+class VarPoolUtilsTest {
+
+ @Test
+ void mergeVarPool() {
+ Truth.assertThat(VarPoolUtils.mergeVarPool(null)).isNull();
+
+ // Override the value of the same property
+ // Merge the property with different key.
+ List<Property> varpool1 = Lists.newArrayList(new Property("name",
Direct.OUT, DataType.VARCHAR, "tom"));
+ List<Property> varpool2 = Lists.newArrayList(
+ new Property("name", Direct.OUT, DataType.VARCHAR, "tim"),
+ new Property("age", Direct.OUT, DataType.INTEGER, "10"));
+
+
Truth.assertThat(VarPoolUtils.mergeVarPool(Lists.newArrayList(varpool1,
varpool2)))
+ .containsExactly(
+ new Property("name", Direct.OUT, DataType.VARCHAR,
"tim"),
+ new Property("age", Direct.OUT, DataType.INTEGER,
"10"));
+
+ }
+
+ @Test
+ void subtractVarPool() {
+ Truth.assertThat(VarPoolUtils.subtractVarPool(null, null)).isNull();
+ List<Property> varpool1 = Lists.newArrayList(new Property("name",
Direct.OUT, DataType.VARCHAR, "tom"),
+ new Property("age", Direct.OUT, DataType.INTEGER, "10"));
+ List<Property> varpool2 = Lists.newArrayList(new Property("name",
Direct.OUT, DataType.VARCHAR, "tom"));
+ List<Property> varpool3 = Lists.newArrayList(new Property("location",
Direct.OUT, DataType.VARCHAR, "china"));
+
+ Truth.assertThat(VarPoolUtils.subtractVarPool(varpool1,
Lists.newArrayList(varpool2, varpool3)))
+ .containsExactly(new Property("age", Direct.OUT,
DataType.INTEGER, "10"));
+ }
+}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 8dd842a8ed..797d8f78a3 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -25,6 +25,7 @@ import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
@@ -86,6 +87,7 @@ public class WorkerServer implements IStoppable {
public void run() {
this.workerRpcServer.start();
TaskPluginManager.loadPlugin();
+ DataSourceProcessorProvider.initialize();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
index 41cef62028..e64a421001 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
@@ -283,6 +283,7 @@ public abstract class WorkerTaskExecutor implements
Runnable {
// upload out files and modify the "OUT FILE" property in VarPool
TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext,
storageOperate);
+
log.info("Upload output files: {} successfully",
TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT));
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
index 87438c614b..f060c0a17d 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
@@ -70,17 +70,18 @@ public class TaskFilesTransferUtils {
*/
public static void uploadOutputFiles(TaskExecutionContext
taskExecutionContext,
StorageOperate storageOperate) throws
TaskException {
- List<Property> varPools = getVarPools(taskExecutionContext);
- // get map of varPools for quick search
- Map<String, Property> varPoolsMap =
varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
-
// get OUTPUT FILE parameters
List<Property> localParamsProperty =
getFileLocalParams(taskExecutionContext, Direct.OUT);
-
if (localParamsProperty.isEmpty()) {
return;
}
+ List<Property> varPools = getVarPools(taskExecutionContext);
+ // get map of varPools for quick search
+ Map<String, Property> varPoolsMap = varPools.stream()
+ .filter(property -> Direct.OUT.equals(property.getDirect()))
+ .collect(Collectors.toMap(Property::getProp, x -> x));
+
log.info("Upload output files ...");
for (Property property : localParamsProperty) {
// get local file path
@@ -137,10 +138,6 @@ public class TaskFilesTransferUtils {
* @throws TaskException task exception
*/
public static void downloadUpstreamFiles(TaskExecutionContext
taskExecutionContext, StorageOperate storageOperate) {
- List<Property> varPools = getVarPools(taskExecutionContext);
- // get map of varPools for quick search
- Map<String, Property> varPoolsMap =
varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
-
// get "IN FILE" parameters
List<Property> localParamsProperty =
getFileLocalParams(taskExecutionContext, Direct.IN);
@@ -148,6 +145,13 @@ public class TaskFilesTransferUtils {
return;
}
+ List<Property> varPools = getVarPools(taskExecutionContext);
+ // get map of varPools for quick search
+ Map<String, Property> varPoolsMap = varPools
+ .stream()
+ .filter(property -> Direct.IN.equals(property.getDirect()))
+ .collect(Collectors.toMap(Property::getProp, x -> x));
+
String executePath = taskExecutionContext.getExecutePath();
// data path to download packaged data
String downloadTmpPath = String.format("%s/%s", executePath,
DOWNLOAD_TMP);