This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e5cca0e79b [Fix-11007] [Master] fix forced_success bug (#11088)
e5cca0e79b is described below
commit e5cca0e79bfe16d07e931bcc68c279643ad45fab
Author: JinYong Li <[email protected]>
AuthorDate: Sat Jul 30 23:28:31 2022 +0800
[Fix-11007] [Master] fix forced_success bug (#11088)
* fix forced_success bug
* add comments
* add transactional
* refactor code
Co-authored-by: JinyLeeChina <[email protected]>
---
.../api/service/impl/TaskInstanceServiceImpl.java | 9 +++--
.../service/process/ProcessService.java | 2 ++
.../service/process/ProcessServiceImpl.java | 38 ++++++++++++++++++++--
3 files changed, 43 insertions(+), 6 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 103612f3d7..1364915203 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.api.service.impl;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
+
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -46,13 +49,11 @@ import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
-
/**
* task instance service impl
*/
@@ -166,6 +167,7 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
* @param taskInstanceId task instance id
* @return the result code and msg
*/
+ @Transactional
@Override
public Map<String, Object> forceTaskSuccess(User loginUser, long
projectCode, Integer taskInstanceId) {
Project project = projectMapper.queryByCode(projectCode);
@@ -198,6 +200,7 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
+
processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index fd33eb115a..976fd90986 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -299,4 +299,6 @@ public interface ProcessService {
ProcessInstance loadNextProcess4Serial(long code, int state, int id);
public String findConfigYamlByName(String clusterName) ;
+
+ void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index a305ca35a0..7dc4ce7bed 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1337,10 +1337,10 @@ public class ProcessServiceImpl implements
ProcessService {
*
* @param parentInstance parentInstance
* @param parentTask parentTask
+ * @param processMap processMap
* @return process instance map
*/
- private ProcessInstanceMap setProcessInstanceMap(ProcessInstance
parentInstance, TaskInstance parentTask) {
- ProcessInstanceMap processMap =
findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
+ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance
parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) {
if (processMap != null) {
return processMap;
}
@@ -1404,11 +1404,16 @@ public class ProcessServiceImpl implements
ProcessService {
// recover failover tolerance would not create a new command when
the sub command already have been created
return;
}
- instanceMap = setProcessInstanceMap(parentProcessInstance, task);
+ instanceMap = setProcessInstanceMap(parentProcessInstance, task,
instanceMap);
ProcessInstance childInstance = null;
if (instanceMap.getProcessInstanceId() != 0) {
childInstance =
findProcessInstanceById(instanceMap.getProcessInstanceId());
}
+ if (childInstance != null && childInstance.getState() ==
ExecutionStatus.SUCCESS
+ && CommandType.START_FAILURE_TASK_PROCESS ==
parentProcessInstance.getCommandType()) {
+ logger.info("sub process instance {} status is success, so skip
creating command", childInstance.getId());
+ return;
+ }
Command subProcessCommand =
createSubProcessCommand(parentProcessInstance, childInstance, instanceMap,
task);
updateSubProcessDefinitionByParent(parentProcessInstance,
subProcessCommand.getProcessDefinitionCode());
initSubInstanceState(childInstance);
@@ -3106,4 +3111,31 @@ public class ProcessServiceImpl implements
ProcessService {
K8s k8s = k8sMapper.selectOne(nodeWrapper);
return k8s.getK8sConfig();
}
+
+ @Override
+ public void forceProcessInstanceSuccessByTaskInstanceId(Integer
taskInstanceId) {
+ TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
+ if (task == null) {
+ return;
+ }
+ ProcessInstance processInstance =
findProcessInstanceDetailById(task.getProcessInstanceId());
+ if (processInstance != null &&
(processInstance.getState().typeIsFailure() ||
processInstance.getState().typeIsCancel())) {
+ List<TaskInstance> validTaskList =
findValidTaskListByProcessId(processInstance.getId());
+ List<Long> instanceTaskCodeList =
validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
+ List<ProcessTaskRelation> taskRelations =
findRelationByCode(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
+ List<TaskDefinitionLog> taskDefinitionLogs =
genTaskDefineList(taskRelations);
+ List<Long> definiteTaskCodeList =
taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() ==
Flag.YES)
+ .map(TaskDefinitionLog::getCode).collect(Collectors.toList());
+ // only all tasks have instances
+ if
(org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList,
definiteTaskCodeList)) {
+ List<Integer> failTaskList =
validTaskList.stream().filter(instance -> instance.getState().typeIsFailure()
|| instance.getState().typeIsCancel())
+ .map(TaskInstance::getId).collect(Collectors.toList());
+ if (failTaskList.size() == 1 &&
failTaskList.contains(taskInstanceId)) {
+ processInstance.setState(ExecutionStatus.SUCCESS);
+ updateProcessInstance(processInstance);
+ }
+ }
+ }
+ }
}