This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f46faa02c3 [Fix-10452] Serial wait for policy recovery (#10453)
f46faa02c3 is described below

commit f46faa02c39b6c31c345fdc75218b3fc15f64ae3
Author: WangJPLeo <[email protected]>
AuthorDate: Wed Jun 22 14:28:50 2022 +0800

    [Fix-10452] Serial wait for policy recovery (#10453)
    
    * Serial wait for policy recovery
    
    * processInstance state check null
    
    * add sendresult(Host host, Command command) method and replace the 
original.
---
 .../api/service/impl/ExecutorServiceImpl.java      |  7 ++--
 .../master/runner/WorkflowExecuteRunnable.java     |  9 +++--
 .../master/runner/WorkflowExecuteThreadPool.java   | 10 ++---
 .../master/runner/task/SubTaskProcessor.java       |  6 +--
 .../processor/StateEventCallbackService.java       |  8 ++++
 .../service/process/ProcessServiceImpl.java        | 46 +++++++++++-----------
 6 files changed, 49 insertions(+), 37 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 0d7376681d..396dcd3811 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -65,6 +65,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.corn.CronUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
@@ -499,13 +500,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
 
         // determine whether the process is normal
         if (update > 0) {
-            String host = processInstance.getHost();
-            String address = host.split(":")[0];
-            int port = Integer.parseInt(host.split(":")[1]);
             StateEventChangeCommand stateEventChangeCommand = new 
StateEventChangeCommand(
                     processInstance.getId(), 0, processInstance.getState(), 
processInstance.getId(), 0
             );
-            stateEventCallbackService.sendResult(address, port, 
stateEventChangeCommand.convert2Command());
+            Host host = new Host(processInstance.getHost());
+            stateEventCallbackService.sendResult(host, 
stateEventChangeCommand.convert2Command());
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 801b482be2..c7d8b5c0b2 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -684,14 +684,13 @@ public class WorkflowExecuteRunnable implements Runnable {
 
             if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
                 // serial wait execution type needs to wake up the waiting 
process
-                if (processDefinition.getExecutionType().typeIsSerialWait()){
+                if (processDefinition.getExecutionType().typeIsSerialWait() || 
processDefinition.getExecutionType().typeIsSerialPriority()){
                     endProcess();
                     return true;
                 }
                 this.updateProcessInstanceState(stateEvent);
                 return true;
             }
-
             if (processComplementData()) {
                 return true;
             }
@@ -832,7 +831,7 @@ public class WorkflowExecuteRunnable implements Runnable {
      */
     public void endProcess() {
         this.stateEvents.clear();
-        if (processDefinition.getExecutionType().typeIsSerialWait()) {
+        if (processDefinition.getExecutionType().typeIsSerialWait() || 
processDefinition.getExecutionType().typeIsSerialPriority()) {
             checkSerialProcess(processDefinition);
         }
         if (processInstance.getState().typeIsWaitingThread()) {
@@ -855,6 +854,10 @@ public class WorkflowExecuteRunnable implements Runnable {
             if (nextProcessInstance == null) {
                 return;
             }
+            ProcessInstance nextReadyStopProcessInstance = 
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),
 ExecutionStatus.READY_STOP.getCode(), processInstance.getId());
+            if (processDefinition.getExecutionType().typeIsSerialPriority() && 
nextReadyStopProcessInstance != null) {
+                return;
+            }
             nextInstanceId = nextProcessInstance.getId();
         }
         ProcessInstance nextProcessInstance = 
this.processService.findProcessInstanceById(nextInstanceId);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index b658f669f7..ec003762d7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@@ -182,16 +183,15 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
      * notify process's master
      */
     private void notifyProcess(ProcessInstance finishProcessInstance, 
ProcessInstance processInstance, TaskInstance taskInstance) {
-        String host = processInstance.getHost();
-        if (Strings.isNullOrEmpty(host)) {
+        String processInstanceHost = processInstance.getHost();
+        if (Strings.isNullOrEmpty(processInstanceHost)) {
             logger.error("process {} host is empty, cannot notify task {} 
now", processInstance.getId(), taskInstance.getId());
             return;
         }
-        String address = host.split(":")[0];
-        int port = Integer.parseInt(host.split(":")[1]);
         StateEventChangeCommand stateEventChangeCommand = new 
StateEventChangeCommand(
                 finishProcessInstance.getId(), 0, 
finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId()
         );
-        stateEventCallbackService.sendResult(address, port, 
stateEventChangeCommand.convert2Command());
+        Host host = new Host(processInstanceHost);
+        stateEventCallbackService.sendResult(host, 
stateEventChangeCommand.convert2Command());
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index d870192049..72d6a7adb4 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -29,6 +29,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
@@ -215,9 +216,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         StateEventChangeCommand stateEventChangeCommand = new 
StateEventChangeCommand(
                 processInstance.getId(), taskInstance.getId(), 
subProcessInstance.getState(), subProcessInstance.getId(), 0
         );
-        String address = subProcessInstance.getHost().split(":")[0];
-        int port = 
Integer.parseInt(subProcessInstance.getHost().split(":")[1]);
-        this.stateEventCallbackService.sendResult(address, port, 
stateEventChangeCommand.convert2Command());
+        Host host = new Host(subProcessInstance.getHost());
+        this.stateEventCallbackService.sendResult(host, 
stateEventChangeCommand.convert2Command());
     }
 
     @Override
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
index af51831068..fb2b411523 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
@@ -122,4 +122,12 @@ public class StateEventCallbackService {
             nettyRemoteChannel.writeAndFlush(command);
         }
     }
+
+    public void sendResult(Host host, Command command) {
+        logger.info("send result, host:{}, command:{}", host.getAddress(), 
command.toString());
+        NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host);
+        if (nettyRemoteChannel != null) {
+            nettyRemoteChannel.writeAndFlush(command);
+        }
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index b66fb915bb..d03d165071 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -345,25 +345,29 @@ public class ProcessServiceImpl implements ProcessService 
{
         } else if 
(processDefinition.getExecutionType().typeIsSerialPriority()) {
             List<ProcessInstance> runningProcessInstances = 
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
                     processInstance.getProcessDefinitionVersion(), 
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
-            if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
-                for (ProcessInstance info : runningProcessInstances) {
-                    info.setCommandType(CommandType.STOP);
-                    info.addHistoryCmd(CommandType.STOP);
-                    info.setState(ExecutionStatus.READY_STOP);
-                    int update = updateProcessInstance(info);
-                    // determine whether the process is normal
-                    if (update > 0) {
-                        String host = info.getHost();
-                        String address = host.split(":")[0];
-                        int port = Integer.parseInt(host.split(":")[1]);
-                        StateEventChangeCommand stateEventChangeCommand = new 
StateEventChangeCommand(
+            if (CollectionUtils.isEmpty(runningProcessInstances)) {
+                processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+                saveProcessInstance(processInstance);
+                return;
+            }
+            for (ProcessInstance info : runningProcessInstances) {
+                if (Objects.nonNull(info.getState()) && 
(ExecutionStatus.READY_STOP.equals(info.getState()) || 
info.getState().typeIsFinished())) {
+                    continue;
+                }
+                info.setCommandType(CommandType.STOP);
+                info.addHistoryCmd(CommandType.STOP);
+                info.setState(ExecutionStatus.READY_STOP);
+                int update = updateProcessInstance(info);
+                // determine whether the process is normal
+                if (update > 0) {
+                    StateEventChangeCommand stateEventChangeCommand = new 
StateEventChangeCommand(
                             info.getId(), 0, info.getState(), info.getId(), 0
-                        );
-                        try {
-                            stateEventCallbackService.sendResult(address, 
port, stateEventChangeCommand.convert2Command());
-                        } catch (Exception e) {
-                            logger.error("sendResultError");
-                        }
+                    );
+                    try {
+                        Host host = new Host(info.getHost());
+                        stateEventCallbackService.sendResult(host, 
stateEventChangeCommand.convert2Command());
+                    } catch (Exception e) {
+                        logger.error("sendResultError",e );
                     }
                 }
             }
@@ -3037,13 +3041,11 @@ public class ProcessServiceImpl implements 
ProcessService {
     @Override
     public void sendStartTask2Master(ProcessInstance processInstance, int 
taskId,
                                      
org.apache.dolphinscheduler.remote.command.CommandType taskType) {
-        String host = processInstance.getHost();
-        String address = host.split(":")[0];
-        int port = Integer.parseInt(host.split(":")[1]);
         TaskEventChangeCommand taskEventChangeCommand = new 
TaskEventChangeCommand(
             processInstance.getId(), taskId
         );
-        stateEventCallbackService.sendResult(address, port, 
taskEventChangeCommand.convert2Command(taskType));
+        Host host = new Host(processInstance.getHost());
+        stateEventCallbackService.sendResult(host, 
taskEventChangeCommand.convert2Command(taskType));
     }
 
     @Override

Reply via email to