This is an automated email from the ASF dual-hosted git repository. caishunfeng pushed a commit to branch 3.1.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 714e258be6e3f666e1c5711fd6575ebbb987c8d3 Author: Stalary <[email protected]> AuthorDate: Thu Sep 15 10:00:38 2022 +0800 [Bug][Dependent]: Id also clone due to duplicate when use dependent mode. (#11929) --- .../api/service/impl/ExecutorServiceImpl.java | 6 +- .../api/service/ExecutorServiceTest.java | 64 +++++++++++++++++++++- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 957447dd68..5755362d99 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -889,7 +889,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * create complement dependent command */ - protected int createComplementDependentCommand(List<Schedule> schedules, Command command) { + public int createComplementDependentCommand(List<Schedule> schedules, Command command) { int dependentProcessDefinitionCreateCount = 0; Command dependentCommand; @@ -903,9 +903,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ List<DependentProcessDefinition> dependentProcessDefinitionList = getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup()); - dependentCommand.setTaskDependType(TaskDependType.TASK_POST); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { + // If the id is Integer, the auto-increment id will be obtained by mybatis-plus + // and causing duplicate when clone it. + dependentCommand.setId(null); dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode()); dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup()); Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 9affbd2c2f..59418e2171 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -20,18 +20,30 @@ package org.apache.dolphinscheduler.api.service; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.RERUN; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; @@ -45,16 +57,17 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; -import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.assertj.core.util.Lists; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -177,7 +190,8 @@ public class ExecutorServiceTest { .thenReturn(checkProjectAndAuth()); Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition); Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); - Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); + doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null)); + doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null)); Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition); @@ -236,6 +250,50 @@ public class ExecutorServiceTest { } + @Test + public void testComplementWithDependentMode() { + Schedule schedule = new Schedule(); + schedule.setStartTime(new Date()); + schedule.setEndTime(new Date()); + schedule.setCrontab("0 0 7 * * ? *"); + schedule.setFailureStrategy(FailureStrategy.CONTINUE); + schedule.setReleaseState(ReleaseState.OFFLINE); + schedule.setWarningType(WarningType.NONE); + schedule.setCreateTime(new Date()); + schedule.setUpdateTime(new Date()); + List<Schedule> schedules = Lists.newArrayList(schedule); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode( + processDefinitionCode)) + .thenReturn(schedules); + + DependentProcessDefinition dependentProcessDefinition = new DependentProcessDefinition(); + dependentProcessDefinition.setProcessDefinitionCode(2); + dependentProcessDefinition.setProcessDefinitionVersion(1); + dependentProcessDefinition.setTaskDefinitionCode(1); + dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + dependentProcessDefinition.setTaskParams( + "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}"); + Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode)) + .thenReturn(Lists.newArrayList(dependentProcessDefinition)); + + Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>(); + processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP); + Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L))) + .thenReturn(processDefinitionWorkerGroupMap); + + Command command = new Command(); + command.setId(1); + command.setCommandType(CommandType.COMPLEMENT_DATA); + command.setCommandParam( + "{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}"); + command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + command.setProcessDefinitionCode(processDefinitionCode); + command.setExecutorId(1); + + int count = executorService.createComplementDependentCommand(schedules, command); + Assert.assertEquals(1, count); + } + /** * date error */
