This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e5cca0e79b [Fix-11007] [Master] fix forced_success bug (#11088)
e5cca0e79b is described below

commit e5cca0e79bfe16d07e931bcc68c279643ad45fab
Author: JinYong Li <[email protected]>
AuthorDate: Sat Jul 30 23:28:31 2022 +0800

    [Fix-11007] [Master] fix forced_success bug (#11088)
    
    * fix forced_success bug
    
    * add comments
    
    * add transactional
    
    * refactor code
    
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../api/service/impl/TaskInstanceServiceImpl.java  |  9 +++--
 .../service/process/ProcessService.java            |  2 ++
 .../service/process/ProcessServiceImpl.java        | 38 ++++++++++++++++++++--
 3 files changed, 43 insertions(+), 6 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 103612f3d7..1364915203 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
+
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -46,13 +49,11 @@ import java.util.stream.Collectors;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
-import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
-import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
-
 /**
  * task instance service impl
  */
@@ -166,6 +167,7 @@ public class TaskInstanceServiceImpl extends 
BaseServiceImpl implements TaskInst
      * @param taskInstanceId task instance id
      * @return the result code and msg
      */
+    @Transactional
     @Override
     public Map<String, Object> forceTaskSuccess(User loginUser, long 
projectCode, Integer taskInstanceId) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -198,6 +200,7 @@ public class TaskInstanceServiceImpl extends 
BaseServiceImpl implements TaskInst
         task.setState(ExecutionStatus.FORCED_SUCCESS);
         int changedNum = taskInstanceMapper.updateById(task);
         if (changedNum > 0) {
+            
processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId);
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index fd33eb115a..976fd90986 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -299,4 +299,6 @@ public interface ProcessService {
     ProcessInstance loadNextProcess4Serial(long code, int state, int id);
 
     public String findConfigYamlByName(String clusterName) ;
+
+    void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index a305ca35a0..7dc4ce7bed 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1337,10 +1337,10 @@ public class ProcessServiceImpl implements 
ProcessService {
      *
      * @param parentInstance parentInstance
      * @param parentTask     parentTask
+     * @param processMap     processMap
      * @return process instance map
      */
-    private ProcessInstanceMap setProcessInstanceMap(ProcessInstance 
parentInstance, TaskInstance parentTask) {
-        ProcessInstanceMap processMap = 
findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
+    private ProcessInstanceMap setProcessInstanceMap(ProcessInstance 
parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) {
         if (processMap != null) {
             return processMap;
         }
@@ -1404,11 +1404,16 @@ public class ProcessServiceImpl implements 
ProcessService {
             // recover failover tolerance would not create a new command when 
the sub command already have been created
             return;
         }
-        instanceMap = setProcessInstanceMap(parentProcessInstance, task);
+        instanceMap = setProcessInstanceMap(parentProcessInstance, task, 
instanceMap);
         ProcessInstance childInstance = null;
         if (instanceMap.getProcessInstanceId() != 0) {
             childInstance = 
findProcessInstanceById(instanceMap.getProcessInstanceId());
         }
+        if (childInstance != null && childInstance.getState() == 
ExecutionStatus.SUCCESS
+            && CommandType.START_FAILURE_TASK_PROCESS == 
parentProcessInstance.getCommandType()) {
+            logger.info("sub process instance {} status is success, so skip 
creating command", childInstance.getId());
+            return;
+        }
         Command subProcessCommand = 
createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, 
task);
         updateSubProcessDefinitionByParent(parentProcessInstance, 
subProcessCommand.getProcessDefinitionCode());
         initSubInstanceState(childInstance);
@@ -3106,4 +3111,31 @@ public class ProcessServiceImpl implements 
ProcessService {
         K8s k8s = k8sMapper.selectOne(nodeWrapper);
         return k8s.getK8sConfig();
     }
+
+    @Override
+    public void forceProcessInstanceSuccessByTaskInstanceId(Integer 
taskInstanceId) {
+        TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
+        if (task == null) {
+            return;
+        }
+        ProcessInstance processInstance = 
findProcessInstanceDetailById(task.getProcessInstanceId());
+        if (processInstance != null && 
(processInstance.getState().typeIsFailure() || 
processInstance.getState().typeIsCancel())) {
+            List<TaskInstance> validTaskList = 
findValidTaskListByProcessId(processInstance.getId());
+            List<Long> instanceTaskCodeList = 
validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
+            List<ProcessTaskRelation> taskRelations = 
findRelationByCode(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
+            List<TaskDefinitionLog> taskDefinitionLogs = 
genTaskDefineList(taskRelations);
+            List<Long> definiteTaskCodeList = 
taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == 
Flag.YES)
+                .map(TaskDefinitionLog::getCode).collect(Collectors.toList());
+            // only all tasks have instances
+            if 
(org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList,
 definiteTaskCodeList)) {
+                List<Integer> failTaskList = 
validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() 
|| instance.getState().typeIsCancel())
+                    .map(TaskInstance::getId).collect(Collectors.toList());
+                if (failTaskList.size() == 1 && 
failTaskList.contains(taskInstanceId)) {
+                    processInstance.setState(ExecutionStatus.SUCCESS);
+                    updateProcessInstance(processInstance);
+                }
+            }
+        }
+    }
 }

Reply via email to