This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f46faa02c3 [Fix-10452] Serial wait for policy recovery (#10453)
f46faa02c3 is described below
commit f46faa02c39b6c31c345fdc75218b3fc15f64ae3
Author: WangJPLeo <[email protected]>
AuthorDate: Wed Jun 22 14:28:50 2022 +0800
[Fix-10452] Serial wait for policy recovery (#10453)
* Serial wait for policy recovery
* processInstance state check null
* add sendresult(Host host, Command command) method and replace the
original.
---
.../api/service/impl/ExecutorServiceImpl.java | 7 ++--
.../master/runner/WorkflowExecuteRunnable.java | 9 +++--
.../master/runner/WorkflowExecuteThreadPool.java | 10 ++---
.../master/runner/task/SubTaskProcessor.java | 6 +--
.../processor/StateEventCallbackService.java | 8 ++++
.../service/process/ProcessServiceImpl.java | 46 +++++++++++-----------
6 files changed, 49 insertions(+), 37 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 0d7376681d..396dcd3811 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -65,6 +65,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -499,13 +500,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
// determine whether the process is normal
if (update > 0) {
- String host = processInstance.getHost();
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new
StateEventChangeCommand(
processInstance.getId(), 0, processInstance.getState(),
processInstance.getId(), 0
);
- stateEventCallbackService.sendResult(address, port,
stateEventChangeCommand.convert2Command());
+ Host host = new Host(processInstance.getHost());
+ stateEventCallbackService.sendResult(host,
stateEventChangeCommand.convert2Command());
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 801b482be2..c7d8b5c0b2 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -684,14 +684,13 @@ public class WorkflowExecuteRunnable implements Runnable {
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting
process
- if (processDefinition.getExecutionType().typeIsSerialWait()){
+ if (processDefinition.getExecutionType().typeIsSerialWait() ||
processDefinition.getExecutionType().typeIsSerialPriority()){
endProcess();
return true;
}
this.updateProcessInstanceState(stateEvent);
return true;
}
-
if (processComplementData()) {
return true;
}
@@ -832,7 +831,7 @@ public class WorkflowExecuteRunnable implements Runnable {
*/
public void endProcess() {
this.stateEvents.clear();
- if (processDefinition.getExecutionType().typeIsSerialWait()) {
+ if (processDefinition.getExecutionType().typeIsSerialWait() ||
processDefinition.getExecutionType().typeIsSerialPriority()) {
checkSerialProcess(processDefinition);
}
if (processInstance.getState().typeIsWaitingThread()) {
@@ -855,6 +854,10 @@ public class WorkflowExecuteRunnable implements Runnable {
if (nextProcessInstance == null) {
return;
}
+ ProcessInstance nextReadyStopProcessInstance =
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),
ExecutionStatus.READY_STOP.getCode(), processInstance.getId());
+ if (processDefinition.getExecutionType().typeIsSerialPriority() &&
nextReadyStopProcessInstance != null) {
+ return;
+ }
nextInstanceId = nextProcessInstance.getId();
}
ProcessInstance nextProcessInstance =
this.processService.findProcessInstanceById(nextInstanceId);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index b658f669f7..ec003762d7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@@ -182,16 +183,15 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
* notify process's master
*/
private void notifyProcess(ProcessInstance finishProcessInstance,
ProcessInstance processInstance, TaskInstance taskInstance) {
- String host = processInstance.getHost();
- if (Strings.isNullOrEmpty(host)) {
+ String processInstanceHost = processInstance.getHost();
+ if (Strings.isNullOrEmpty(processInstanceHost)) {
logger.error("process {} host is empty, cannot notify task {}
now", processInstance.getId(), taskInstance.getId());
return;
}
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new
StateEventChangeCommand(
finishProcessInstance.getId(), 0,
finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId()
);
- stateEventCallbackService.sendResult(address, port,
stateEventChangeCommand.convert2Command());
+ Host host = new Host(processInstanceHost);
+ stateEventCallbackService.sendResult(host,
stateEventChangeCommand.convert2Command());
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index d870192049..72d6a7adb4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -29,6 +29,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -215,9 +216,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
StateEventChangeCommand stateEventChangeCommand = new
StateEventChangeCommand(
processInstance.getId(), taskInstance.getId(),
subProcessInstance.getState(), subProcessInstance.getId(), 0
);
- String address = subProcessInstance.getHost().split(":")[0];
- int port =
Integer.parseInt(subProcessInstance.getHost().split(":")[1]);
- this.stateEventCallbackService.sendResult(address, port,
stateEventChangeCommand.convert2Command());
+ Host host = new Host(subProcessInstance.getHost());
+ this.stateEventCallbackService.sendResult(host,
stateEventChangeCommand.convert2Command());
}
@Override
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
index af51831068..fb2b411523 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
@@ -122,4 +122,12 @@ public class StateEventCallbackService {
nettyRemoteChannel.writeAndFlush(command);
}
}
+
+ public void sendResult(Host host, Command command) {
+ logger.info("send result, host:{}, command:{}", host.getAddress(),
command.toString());
+ NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host);
+ if (nettyRemoteChannel != null) {
+ nettyRemoteChannel.writeAndFlush(command);
+ }
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index b66fb915bb..d03d165071 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -345,25 +345,29 @@ public class ProcessServiceImpl implements ProcessService
{
} else if
(processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
- if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
- for (ProcessInstance info : runningProcessInstances) {
- info.setCommandType(CommandType.STOP);
- info.addHistoryCmd(CommandType.STOP);
- info.setState(ExecutionStatus.READY_STOP);
- int update = updateProcessInstance(info);
- // determine whether the process is normal
- if (update > 0) {
- String host = info.getHost();
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
- StateEventChangeCommand stateEventChangeCommand = new
StateEventChangeCommand(
+ if (CollectionUtils.isEmpty(runningProcessInstances)) {
+ processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+ saveProcessInstance(processInstance);
+ return;
+ }
+ for (ProcessInstance info : runningProcessInstances) {
+ if (Objects.nonNull(info.getState()) &&
(ExecutionStatus.READY_STOP.equals(info.getState()) ||
info.getState().typeIsFinished())) {
+ continue;
+ }
+ info.setCommandType(CommandType.STOP);
+ info.addHistoryCmd(CommandType.STOP);
+ info.setState(ExecutionStatus.READY_STOP);
+ int update = updateProcessInstance(info);
+ // determine whether the process is normal
+ if (update > 0) {
+ StateEventChangeCommand stateEventChangeCommand = new
StateEventChangeCommand(
info.getId(), 0, info.getState(), info.getId(), 0
- );
- try {
- stateEventCallbackService.sendResult(address,
port, stateEventChangeCommand.convert2Command());
- } catch (Exception e) {
- logger.error("sendResultError");
- }
+ );
+ try {
+ Host host = new Host(info.getHost());
+ stateEventCallbackService.sendResult(host,
stateEventChangeCommand.convert2Command());
+ } catch (Exception e) {
+ logger.error("sendResultError",e );
}
}
}
@@ -3037,13 +3041,11 @@ public class ProcessServiceImpl implements
ProcessService {
@Override
public void sendStartTask2Master(ProcessInstance processInstance, int
taskId,
org.apache.dolphinscheduler.remote.command.CommandType taskType) {
- String host = processInstance.getHost();
- String address = host.split(":")[0];
- int port = Integer.parseInt(host.split(":")[1]);
TaskEventChangeCommand taskEventChangeCommand = new
TaskEventChangeCommand(
processInstance.getId(), taskId
);
- stateEventCallbackService.sendResult(address, port,
taskEventChangeCommand.convert2Command(taskType));
+ Host host = new Host(processInstance.getHost());
+ stateEventCallbackService.sendResult(host,
taskEventChangeCommand.convert2Command(taskType));
}
@Override