This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d7df899909 [Fix-11003]Task group queue is not updated to final state 
(#11004)
d7df899909 is described below

commit d7df89990965155e7695da402dcf322ec276355a
Author: Mr.An <[email protected]>
AuthorDate: Tue Jul 19 17:48:16 2022 +0800

    [Fix-11003]Task group queue is not updated to final state (#11004)
    
    * fix after the task group is forced to start the task. Task group status 
has not changed
---
 .../service/process/ProcessServiceImpl.java        | 50 +++++++++++-----------
 1 file changed, 25 insertions(+), 25 deletions(-)

diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 476cb03d7f..a305ca35a0 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.service.process;
 
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@@ -31,8 +32,6 @@ import static 
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
 import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
 import static 
org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
 
-import static java.util.stream.Collectors.toSet;
-
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -322,14 +321,14 @@ public class ProcessServiceImpl implements ProcessService 
{
         //when we get the running instance(or waiting instance) only get the 
priority instance(by id)
         if (processDefinition.getExecutionType().typeIsSerialWait()) {
             List<ProcessInstance> runningProcessInstances = 
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
-                    processInstance.getProcessDefinitionVersion(), 
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+                processInstance.getProcessDefinitionVersion(), 
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
             if (CollectionUtils.isEmpty(runningProcessInstances)) {
                 processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
                 saveProcessInstance(processInstance);
             }
         } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) 
{
             List<ProcessInstance> runningProcessInstances = 
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
-                    processInstance.getProcessDefinitionVersion(), 
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+                processInstance.getProcessDefinitionVersion(), 
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
             if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
                 processInstance.setState(ExecutionStatus.STOP);
                 saveProcessInstance(processInstance);
@@ -339,7 +338,7 @@ public class ProcessServiceImpl implements ProcessService {
             saveProcessInstance(processInstance);
         } else if 
(processDefinition.getExecutionType().typeIsSerialPriority()) {
             List<ProcessInstance> runningProcessInstances = 
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
-                    processInstance.getProcessDefinitionVersion(), 
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+                processInstance.getProcessDefinitionVersion(), 
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
             if (CollectionUtils.isEmpty(runningProcessInstances)) {
                 processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
                 saveProcessInstance(processInstance);
@@ -356,13 +355,13 @@ public class ProcessServiceImpl implements ProcessService 
{
                 // determine whether the process is normal
                 if (update > 0) {
                     StateEventChangeCommand stateEventChangeCommand = new 
StateEventChangeCommand(
-                            info.getId(), 0, info.getState(), info.getId(), 0
+                        info.getId(), 0, info.getState(), info.getId(), 0
                     );
                     try {
                         Host host = new Host(info.getHost());
                         stateEventCallbackService.sendResult(host, 
stateEventChangeCommand.convert2Command());
                     } catch (Exception e) {
-                        logger.error("sendResultError",e );
+                        logger.error("sendResultError", e);
                     }
                 }
             }
@@ -798,10 +797,10 @@ public class ProcessServiceImpl implements ProcessService 
{
         }
 
         String globalParams = 
curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
-                processDefinition.getGlobalParamMap(),
-                processDefinition.getGlobalParamList(),
-                getCommandTypeIfComplement(processInstance, command),
-                processInstance.getScheduleTime(), timezoneId);
+            processDefinition.getGlobalParamMap(),
+            processDefinition.getGlobalParamList(),
+            getCommandTypeIfComplement(processInstance, command),
+            processInstance.getScheduleTime(), timezoneId);
         processInstance.setGlobalParams(globalParams);
 
         // set process instance priority
@@ -952,10 +951,10 @@ public class ProcessServiceImpl implements ProcessService 
{
 
             // Recalculate global parameters after rerun.
             String globalParams = 
curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
-                    processDefinition.getGlobalParamMap(),
-                    processDefinition.getGlobalParamList(),
-                    commandTypeIfComplement,
-                    processInstance.getScheduleTime(), timezoneId);
+                processDefinition.getGlobalParamMap(),
+                processDefinition.getGlobalParamList(),
+                commandTypeIfComplement,
+                processInstance.getScheduleTime(), timezoneId);
             processInstance.setGlobalParams(globalParams);
             processInstance.setProcessDefinition(processDefinition);
         }
@@ -1146,9 +1145,9 @@ public class ProcessServiceImpl implements ProcessService 
{
         String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
 
         String globalParams = 
curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
-                processDefinition.getGlobalParamMap(),
-                processDefinition.getGlobalParamList(),
-                CommandType.COMPLEMENT_DATA, 
processInstance.getScheduleTime(), timezoneId);
+            processDefinition.getGlobalParamMap(),
+            processDefinition.getGlobalParamList(),
+            CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), 
timezoneId);
         processInstance.setGlobalParams(globalParams);
     }
 
@@ -1310,9 +1309,9 @@ public class ProcessServiceImpl implements ProcessService 
{
         TaskInstance task = submitTaskInstanceToDB(taskInstance, 
processInstance);
         if (task == null) {
             logger.error("Save taskInstance to db error, task name:{}, process 
id:{} state: {} ",
-                         taskInstance.getName(),
-                         taskInstance.getProcessInstance().getId(),
-                         processInstance.getState());
+                taskInstance.getName(),
+                taskInstance.getProcessInstance().getId(),
+                processInstance.getState());
             return null;
         }
 
@@ -1559,9 +1558,9 @@ public class ProcessServiceImpl implements ProcessService 
{
         ExecutionStatus processInstanceState = processInstance.getState();
         if (processInstanceState.typeIsFinished() || processInstanceState == 
ExecutionStatus.READY_STOP) {
             logger.warn("processInstance: {} state was: {}, skip submit this 
task, taskCode: {}",
-                        processInstance.getId(),
-                        processInstanceState,
-                        taskInstance.getTaskCode());
+                processInstance.getId(),
+                processInstanceState,
+                taskInstance.getTaskCode());
             return null;
         }
         if (processInstanceState == ExecutionStatus.READY_PAUSE) {
@@ -2989,7 +2988,7 @@ public class ProcessServiceImpl implements ProcessService 
{
             return null;
         }
         try {
-            while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), 
taskGroup.getUseSize()
+            while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && 
taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), 
taskGroup.getUseSize()
                 , thisTaskGroupQueue.getId(), 
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
                 thisTaskGroupQueue = 
this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
                 if (thisTaskGroupQueue.getStatus() == 
TaskGroupQueueStatus.RELEASE) {
@@ -3089,6 +3088,7 @@ public class ProcessServiceImpl implements ProcessService 
{
             throw new ServiceException("delete command fail, id:" + commandId);
         }
     }
+
     /**
      * find k8s config yaml by clusterName
      *

Reply via email to