This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 714e258be6e3f666e1c5711fd6575ebbb987c8d3
Author: Stalary <[email protected]>
AuthorDate: Thu Sep 15 10:00:38 2022 +0800

    [Bug][Dependent]: Id also clone due to duplicate when use dependent mode. 
(#11929)
---
 .../api/service/impl/ExecutorServiceImpl.java      |  6 +-
 .../api/service/ExecutorServiceTest.java           | 64 +++++++++++++++++++++-
 2 files changed, 65 insertions(+), 5 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 957447dd68..5755362d99 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -889,7 +889,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
     /**
      * create complement dependent command
      */
-    protected int createComplementDependentCommand(List<Schedule> schedules, 
Command command) {
+    public int createComplementDependentCommand(List<Schedule> schedules, 
Command command) {
         int dependentProcessDefinitionCreateCount = 0;
         Command dependentCommand;
 
@@ -903,9 +903,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         List<DependentProcessDefinition> dependentProcessDefinitionList =
                 
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
                         CronUtils.getMaxCycle(schedules.get(0).getCrontab()), 
dependentCommand.getWorkerGroup());
-
         dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
         for (DependentProcessDefinition dependentProcessDefinition : 
dependentProcessDefinitionList) {
+            // If the id is Integer, the auto-increment id will be obtained by 
mybatis-plus
+            // and causing duplicate when clone it.
+            dependentCommand.setId(null);
             
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
             
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
             Map<String, String> cmdParam = 
JSONUtils.toMap(dependentCommand.getCommandParam());
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 9affbd2c2f..59418e2171 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -20,18 +20,30 @@ package org.apache.dolphinscheduler.api.service;
 import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.RERUN;
 import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
+import 
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
 import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
@@ -45,16 +57,17 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
-import 
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.assertj.core.util.Lists;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -177,7 +190,8 @@ public class ExecutorServiceTest {
                 .thenReturn(checkProjectAndAuth());
         
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
         Mockito.when(processService.getTenantForProcess(tenantId, 
userId)).thenReturn(new Tenant());
-        
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
+        doReturn(1).when(processService).createCommand(argThat(c -> c.getId() 
== null));
+        doReturn(0).when(processService).createCommand(argThat(c -> c.getId() 
!= null));
         
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
         
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
         Mockito.when(processService.findProcessDefinition(1L, 
1)).thenReturn(processDefinition);
@@ -236,6 +250,50 @@ public class ExecutorServiceTest {
 
     }
 
+    @Test
+    public void testComplementWithDependentMode() {
+        Schedule schedule = new Schedule();
+        schedule.setStartTime(new Date());
+        schedule.setEndTime(new Date());
+        schedule.setCrontab("0 0 7 * * ? *");
+        schedule.setFailureStrategy(FailureStrategy.CONTINUE);
+        schedule.setReleaseState(ReleaseState.OFFLINE);
+        schedule.setWarningType(WarningType.NONE);
+        schedule.setCreateTime(new Date());
+        schedule.setUpdateTime(new Date());
+        List<Schedule> schedules = Lists.newArrayList(schedule);
+        
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(
+                processDefinitionCode))
+                .thenReturn(schedules);
+
+        DependentProcessDefinition dependentProcessDefinition = new 
DependentProcessDefinition();
+        dependentProcessDefinition.setProcessDefinitionCode(2);
+        dependentProcessDefinition.setProcessDefinitionVersion(1);
+        dependentProcessDefinition.setTaskDefinitionCode(1);
+        
dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        dependentProcessDefinition.setTaskParams(
+                
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
+        
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
+                .thenReturn(Lists.newArrayList(dependentProcessDefinition));
+
+        Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
+        processDefinitionWorkerGroupMap.put(1L, 
Constants.DEFAULT_WORKER_GROUP);
+        
Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
+                .thenReturn(processDefinitionWorkerGroupMap);
+
+        Command command = new Command();
+        command.setId(1);
+        command.setCommandType(CommandType.COMPLEMENT_DATA);
+        command.setCommandParam(
+                "{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 
00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}");
+        command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        command.setProcessDefinitionCode(processDefinitionCode);
+        command.setExecutorId(1);
+
+        int count = 
executorService.createComplementDependentCommand(schedules, command);
+        Assert.assertEquals(1, count);
+    }
+
     /**
      * date error
      */

Reply via email to