This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f639a2eed4 [Fix-10854] Fix database restart may lost task instance
status (#10866)
f639a2eed4 is described below
commit f639a2eed464abc0045d970b2e61397b4c488772
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jul 11 09:57:00 2022 +0800
[Fix-10854] Fix database restart may lost task instance status (#10866)
* Fix database update error doesn't rollback the task instance status
* Fix database error may cause workflow dead with running status
---
.../common/enums/StateEventType.java | 2 +-
.../enums/{Event.java => TaskEventType.java} | 2 +-
.../dao/utils/TaskInstanceUtils.java | 79 +++++++
.../dao/utils/TaskInstanceUtilsTest.java | 43 ++++
.../server/master/MasterServer.java | 10 +-
.../server/master/event/TaskDelayEventHandler.java | 120 +++++++++++
.../master/event/TaskDispatchEventHandler.java | 88 ++++++++
.../server/master/event/TaskEventHandleError.java | 17 +-
.../master/event/TaskEventHandleException.java | 17 +-
.../server/master/event/TaskEventHandler.java | 22 +-
.../event/TaskRejectByWorkerEventHandler.java | 77 +++++++
.../master/event/TaskResultEventHandler.java | 117 ++++++++++
.../master/event/TaskRunningEventHandler.java | 118 ++++++++++
.../server/master/event/WorkflowEvent.java | 19 +-
.../master/event/WorkflowEventHandleError.java | 17 +-
.../master/event/WorkflowEventHandleException.java | 17 +-
.../server/master/event/WorkflowEventHandler.java | 19 +-
.../server/master/event/WorkflowEventQueue.java | 48 +++++
.../server/master/event/WorkflowEventType.java | 13 +-
.../master/event/WorkflowStartEventHandler.java | 86 ++++++++
.../master/event/WorkflowStateEventHandler.java | 9 +-
.../master/processor/StateEventProcessor.java | 11 +-
.../processor/queue/StateEventResponseService.java | 6 +-
.../server/master/processor/queue/TaskEvent.java | 12 +-
.../processor/queue/TaskExecuteRunnable.java | 210 +++---------------
.../processor/queue/TaskExecuteThreadPool.java | 31 +--
...rService.java => MasterSchedulerBootstrap.java} | 183 +++++-----------
.../server/master/runner/WorkflowEventLooper.java | 108 ++++++++++
.../master/runner/WorkflowExecuteRunnable.java | 240 ++++++++++++---------
.../remote/command/TaskRecallCommand.java | 1 -
.../service/process/ProcessServiceImpl.java | 20 +-
.../server/worker/cache/ResponseCache.java | 4 +-
.../worker/processor/TaskCallbackService.java | 8 +-
.../worker/runner/RetryReportTaskStatusThread.java | 14 +-
34 files changed, 1253 insertions(+), 535 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
index 292202501c..bc021e5e08 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
@@ -21,7 +21,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
public enum StateEventType {
- PROCESS_STATE_CHANGE(0, "process statechange"),
+ PROCESS_STATE_CHANGE(0, "process state change"),
TASK_STATE_CHANGE(1, "task state change"),
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout"),
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
similarity index 97%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
index 26c3a3beab..09f85d3f17 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.common.enums;
-public enum Event {
+public enum TaskEventType {
DISPATCH,
DELAY,
RUNNING,
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
new file mode 100644
index 0000000000..fa3bfec0ca
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+public class TaskInstanceUtils {
+
+ /**
+ * Copy the property of given source {@link TaskInstance} to target.
+ *
+ * @param source Given task instance, copy from.
+ * @param target Given task instance, copy to
+ * @return a soft copy of given task instance.
+ */
+ public static void copyTaskInstance(TaskInstance source, TaskInstance
target) {
+ target.setId(source.getId());
+ target.setName(source.getName());
+ target.setTaskType(source.getTaskType());
+ target.setProcessInstanceId(source.getProcessInstanceId());
+ target.setTaskCode(source.getTaskCode());
+ target.setTaskDefinitionVersion(source.getTaskDefinitionVersion());
+ target.setProcessInstanceName(source.getProcessInstanceName());
+ target.setTaskGroupPriority(source.getTaskGroupPriority());
+ target.setState(source.getState());
+ target.setFirstSubmitTime(source.getFirstSubmitTime());
+ target.setSubmitTime(source.getSubmitTime());
+ target.setStartTime(source.getStartTime());
+ target.setEndTime(source.getEndTime());
+ target.setHost(source.getHost());
+ target.setExecutePath(source.getExecutePath());
+ target.setLogPath(source.getLogPath());
+ target.setRetryTimes(source.getRetryTimes());
+ target.setAlertFlag(source.getAlertFlag());
+ target.setProcessInstance(source.getProcessInstance());
+ target.setProcessDefine(source.getProcessDefine());
+ target.setTaskDefine(source.getTaskDefine());
+ target.setPid(source.getPid());
+ target.setAppLink(source.getAppLink());
+ target.setFlag(source.getFlag());
+ target.setDependency(source.getDependency());
+ // todo: we need to cpoy the task params and then copy
switchDependency, since the setSwitchDependency rely on task params, this is
really a very bad practice.
+ target.setTaskParams(source.getTaskParams());
+ target.setSwitchDependency(source.getSwitchDependency());
+ target.setDuration(source.getDuration());
+ target.setMaxRetryTimes(source.getMaxRetryTimes());
+ target.setRetryInterval(source.getRetryInterval());
+ target.setTaskInstancePriority(source.getTaskInstancePriority());
+ target.setDependentResult(source.getDependentResult());
+ target.setWorkerGroup(source.getWorkerGroup());
+ target.setEnvironmentCode(source.getEnvironmentCode());
+ target.setEnvironmentConfig(source.getEnvironmentConfig());
+ target.setExecutorId(source.getExecutorId());
+ target.setVarPool(source.getVarPool());
+ target.setExecutorName(source.getExecutorName());
+ target.setResources(source.getResources());
+ target.setDelayTime(source.getDelayTime());
+ target.setDryRun(source.getDryRun());
+ target.setTaskGroupId(source.getTaskGroupId());
+ target.setCpuQuota(source.getCpuQuota());
+ target.setMemoryMax(source.getMemoryMax());
+ }
+
+}
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java
new file mode 100644
index 0000000000..497c2fb881
--- /dev/null
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.Date;
+import java.util.HashMap;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TaskInstanceUtilsTest {
+
+ @Test
+ void copyTaskInstance() {
+ TaskInstance source = new TaskInstance();
+ source.setId(1);
+ source.setName("source");
+ source.setSubmitTime(new Date());
+ source.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
+ TaskInstance target = new TaskInstance();
+ TaskInstanceUtils.copyTaskInstance(source, target);
+ Assertions.assertEquals(target.getId(), source.getId());
+ Assertions.assertEquals(target.getName(), source.getName());
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index d26d33c062..3469cd4852 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -26,7 +26,7 @@ import
org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
+import
org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@@ -59,7 +59,7 @@ public class MasterServer implements IStoppable {
private TaskPluginManager taskPluginManager;
@Autowired
- private MasterSchedulerService masterSchedulerService;
+ private MasterSchedulerBootstrap masterSchedulerBootstrap;
@Autowired
private SchedulerApi schedulerApi;
@@ -94,8 +94,8 @@ public class MasterServer implements IStoppable {
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
- this.masterSchedulerService.init();
- this.masterSchedulerService.start();
+ this.masterSchedulerBootstrap.init();
+ this.masterSchedulerBootstrap.start();
this.eventExecuteService.start();
this.failoverExecuteThread.start();
@@ -130,7 +130,7 @@ public class MasterServer implements IStoppable {
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.schedulerApi.close();
- this.masterSchedulerService.close();
+ this.masterSchedulerBootstrap.close();
this.masterRPCServer.close();
this.masterRegistryClient.closeRegistry();
// close spring Context and will invoke method with @PreDestroy
annotation to destroy beans.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
new file mode 100644
index 0000000000..8e2dcced23
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskDelayEventHandler implements TaskEventHandler {
+
+ private final Logger logger =
LoggerFactory.getLogger(TaskDelayEventHandler.class);
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ @Autowired
+ private ProcessService processService;
+
+ @Autowired
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+ @Override
+ public void handleTaskEvent(TaskEvent taskEvent) throws
TaskEventHandleError {
+ int taskInstanceId = taskEvent.getTaskInstanceId();
+ int processInstanceId = taskEvent.getProcessInstanceId();
+
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteRunnable == null) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError("Cannot find related workflow
instance from cache");
+ }
+ Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+ if (!taskInstanceOptional.isPresent()) {
+ sendAckToWorker(taskEvent);
+ return;
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().typeIsFinished()) {
+ logger.warn(
+ "The current task status is: {}, will not handle the running
event, this event is delay, will discard this event: {}",
+ taskInstance.getState(),
+ taskEvent);
+ sendAckToWorker(taskEvent);
+ return;
+ }
+
+ TaskInstance oldTaskInstance = new TaskInstance();
+ TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+ try {
+ taskInstance.setState(taskEvent.getState());
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setLogPath(taskEvent.getLogPath());
+ taskInstance.setExecutePath(taskEvent.getExecutePath());
+ taskInstance.setPid(taskEvent.getProcessId());
+ taskInstance.setAppLink(taskEvent.getAppIds());
+ if (!processService.updateTaskInstance(taskInstance)) {
+ throw new TaskEventHandleError("Handle task delay event error,
update taskInstance to db failed");
+ }
+ sendAckToWorker(taskEvent);
+ } catch (Exception ex) {
+ TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+ if (ex instanceof TaskEventHandleError) {
+ throw ex;
+ }
+ throw new TaskEventHandleError("Handle task dispatch event error,
update taskInstance to db failed", ex);
+ }
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+ stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+ stateEvent.setExecutionStatus(taskEvent.getState());
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ workflowExecuteThreadPool.submitStateEvent(stateEvent);
+
+ }
+
+ private void sendAckToWorker(TaskEvent taskEvent) {
+ // If event handle success, send ack to worker to otherwise the worker
will retry this event
+ TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
+ new
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
+
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+ }
+
+ @Override
+ public TaskEventType getHandleEventType() {
+ return TaskEventType.DELAY;
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
new file mode 100644
index 0000000000..9378f0c36e
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskDispatchEventHandler implements TaskEventHandler {
+
+ private final Logger logger =
LoggerFactory.getLogger(TaskDispatchEventHandler.class);
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ @Autowired
+ private ProcessService processService;
+
+ @Override
+ public void handleTaskEvent(TaskEvent taskEvent) throws
TaskEventHandleError {
+ int taskInstanceId = taskEvent.getTaskInstanceId();
+ int processInstanceId = taskEvent.getProcessInstanceId();
+
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteRunnable == null) {
+ throw new TaskEventHandleError("Cannot find related workflow
instance from cache");
+ }
+ TaskInstance taskInstance =
workflowExecuteRunnable.getTaskInstance(taskInstanceId)
+ .orElseThrow(() -> new TaskEventHandleError("Cannot find related
taskInstance from cache"));
+ if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
+ logger.warn(
+ "The current taskInstance status is not SUBMITTED_SUCCESS, so
the dispatch event will be discarded, the current is a delay event, event: {}",
+ taskEvent);
+ return;
+ }
+
+ // todo: we need to just log the old status and rollback these two
field, no need to copy all fields
+ TaskInstance oldTaskInstance = new TaskInstance();
+ TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+ // update the taskInstance status
+ taskInstance.setState(ExecutionStatus.DISPATCH);
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ try {
+ if (!processService.updateTaskInstance(taskInstance)) {
+ throw new TaskEventHandleError("Handle task dispatch event
error, update taskInstance to db failed");
+ }
+ } catch (Exception ex) {
+ // rollback status
+ TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+ if (ex instanceof TaskEventHandleError) {
+ throw ex;
+ }
+ throw new TaskEventHandleError("Handle task running event error,
update taskInstance to db failed", ex);
+ }
+ }
+
+ @Override
+ public TaskEventType getHandleEventType() {
+ return TaskEventType.DISPATCH;
+ }
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java
similarity index 72%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java
index 26c3a3beab..deae719f4b 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-public enum Event {
- DISPATCH,
- DELAY,
- RUNNING,
- RESULT,
- WORKER_REJECT
+public class TaskEventHandleError extends Exception {
+
+ public TaskEventHandleError(String message) {
+ super(message);
+ }
+
+ public TaskEventHandleError(String message, Throwable throwable) {
+ super(message, throwable);
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java
similarity index 71%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java
index 26c3a3beab..e53a2bbcc3 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-public enum Event {
- DISPATCH,
- DELAY,
- RUNNING,
- RESULT,
- WORKER_REJECT
+public class TaskEventHandleException extends Exception {
+
+ public TaskEventHandleException(String message) {
+ super(message);
+ }
+
+ public TaskEventHandleException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java
similarity index 57%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java
index 26c3a3beab..5d214cbb5a 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java
@@ -15,12 +15,20 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-public enum Event {
- DISPATCH,
- DELAY,
- RUNNING,
- RESULT,
- WORKER_REJECT
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+
+public interface TaskEventHandler {
+
+ /**
+ * Handle the task event
+ *
+ * @throws TaskEventHandleError this exception means we will discord
this event.
+ * @throws TaskEventHandleException this exception means we need to retry
this event
+ */
+ void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError,
TaskEventHandleException;
+
+ TaskEventType getHandleEventType();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
new file mode 100644
index 0000000000..d09a8364ca
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
+import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ @Override
+ public void handleTaskEvent(TaskEvent taskEvent) throws
TaskEventHandleError {
+ int taskInstanceId = taskEvent.getTaskInstanceId();
+ int processInstanceId = taskEvent.getProcessInstanceId();
+
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteRunnable == null) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle task reject event error, cannot find related workflow
instance from cache, will discard this event");
+ }
+ TaskInstance taskInstance =
workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> {
+ sendAckToWorker(taskEvent);
+ return new TaskEventHandleError(
+ "Handle task reject event error, cannot find the taskInstance
from cache, will discord this event");
+ });
+ try {
+ // todo: If the worker submit multiple reject response to master,
the task instance may be dispatch multiple,
+ // we need to control the worker overload by master rather than
worker
+ // if the task resubmit and the worker failover, this task may be
dispatch twice?
+ // todo: we need to clear the taskInstance host and rollback the
status to submit.
+ workflowExecuteRunnable.resubmit(taskInstance.getTaskCode());
+ sendAckToWorker(taskEvent);
+ } catch (Exception ex) {
+ throw new TaskEventHandleError("Handle task reject event error",
ex);
+ }
+
+ }
+
+ public void sendAckToWorker(TaskEvent taskEvent) {
+ TaskRecallAckCommand taskRecallAckCommand =
+ new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
+
taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command());
+ }
+
+ @Override
+ public TaskEventType getHandleEventType() {
+ return TaskEventType.WORKER_REJECT;
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
new file mode 100644
index 0000000000..67df03682f
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
+import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Optional;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskResultEventHandler implements TaskEventHandler {
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ @Autowired
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+ @Autowired
+ private DataQualityResultOperator dataQualityResultOperator;
+
+ @Autowired
+ private ProcessService processService;
+
+ @Override
+ public void handleTaskEvent(TaskEvent taskEvent) throws
TaskEventHandleError, TaskEventHandleException {
+ int taskInstanceId = taskEvent.getTaskInstanceId();
+ int processInstanceId = taskEvent.getProcessInstanceId();
+
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteRunnable == null) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle task result event error, cannot find related workflow
instance from cache, will discard this event");
+ }
+ Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+ if (!taskInstanceOptional.isPresent()) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle task result event error, cannot find the taskInstance
from cache, will discord this event");
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().typeIsFinished()) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle task result event error, the task instance is already
finished, will discord this event");
+ }
+ dataQualityResultOperator.operateDqExecuteResult(taskEvent,
taskInstance);
+
+ TaskInstance oldTaskInstance = new TaskInstance();
+ TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+ try {
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setLogPath(taskEvent.getLogPath());
+ taskInstance.setExecutePath(taskEvent.getExecutePath());
+ taskInstance.setPid(taskEvent.getProcessId());
+ taskInstance.setAppLink(taskEvent.getAppIds());
+ taskInstance.setState(taskEvent.getState());
+ taskInstance.setEndTime(taskEvent.getEndTime());
+ taskInstance.setVarPool(taskEvent.getVarPool());
+ processService.changeOutParam(taskInstance);
+ processService.updateTaskInstance(taskInstance);
+ sendAckToWorker(taskEvent);
+ } catch (Exception ex) {
+ TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+ throw new TaskEventHandleError("Handle task result event error,
save taskInstance to db error", ex);
+ }
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+ stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+ stateEvent.setExecutionStatus(taskEvent.getState());
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ workflowExecuteThreadPool.submitStateEvent(stateEvent);
+
+ }
+
+ public void sendAckToWorker(TaskEvent taskEvent) {
+ TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
+ new
TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
+
taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+ }
+
+ @Override
+ public TaskEventType getHandleEventType() {
+ return TaskEventType.RESULT;
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
new file mode 100644
index 0000000000..31152973a2
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskRunningEventHandler implements TaskEventHandler {
+ private final Logger logger =
LoggerFactory.getLogger(TaskRunningEventHandler.class);
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ @Autowired
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+ @Autowired
+ private ProcessService processService;
+
+ @Override
+ public void handleTaskEvent(TaskEvent taskEvent) throws
TaskEventHandleError {
+ int taskInstanceId = taskEvent.getTaskInstanceId();
+ int processInstanceId = taskEvent.getProcessInstanceId();
+
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteRunnable == null) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle task running event error, cannot find related workflow
instance from cache, will discard this event");
+ }
+ Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+ if (!taskInstanceOptional.isPresent()) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle running event error, cannot find the taskInstance from
cache, will discord this event");
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().typeIsFinished()) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle task running event error, this task instance is
already finished, this event is delay, will discard this event");
+ }
+
+ TaskInstance oldTaskInstance = new TaskInstance();
+ TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+ try {
+ taskInstance.setState(taskEvent.getState());
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setLogPath(taskEvent.getLogPath());
+ taskInstance.setExecutePath(taskEvent.getExecutePath());
+ taskInstance.setPid(taskEvent.getProcessId());
+ taskInstance.setAppLink(taskEvent.getAppIds());
+ if (!processService.updateTaskInstance(taskInstance)) {
+ throw new TaskEventHandleError("Handle task running event
error, update taskInstance to db failed");
+ }
+ sendAckToWorker(taskEvent);
+ } catch (Exception ex) {
+ TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+ if (ex instanceof TaskEventHandleError) {
+ throw ex;
+ }
+ throw new TaskEventHandleError("Handle task running event error,
update taskInstance to db failed", ex);
+ }
+
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+ stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+ stateEvent.setExecutionStatus(taskEvent.getState());
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ workflowExecuteThreadPool.submitStateEvent(stateEvent);
+ }
+
+ private void sendAckToWorker(TaskEvent taskEvent) {
+ // If event handle success, send ack to worker to otherwise the worker
will retry this event
+ TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
+ new
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
+
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+ }
+
+ @Override
+ public TaskEventType getHandleEventType() {
+ return TaskEventType.RUNNING;
+ }
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java
similarity index 75%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java
index 26c3a3beab..5d5bc2eb32 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java
@@ -15,12 +15,17 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class WorkflowEvent {
+
+ private WorkflowEventType workflowEventType;
+
+ private int workflowInstanceId;
-public enum Event {
- DISPATCH,
- DELAY,
- RUNNING,
- RESULT,
- WORKER_REJECT
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java
similarity index 71%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java
index 26c3a3beab..651f714037 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-public enum Event {
- DISPATCH,
- DELAY,
- RUNNING,
- RESULT,
- WORKER_REJECT
+public class WorkflowEventHandleError extends Exception {
+
+ public WorkflowEventHandleError(String message) {
+ super(message);
+ }
+
+ public WorkflowEventHandleError(String message, Throwable throwable) {
+ super(message, throwable);
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java
similarity index 70%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java
index 26c3a3beab..c9b84fd10d 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-public enum Event {
- DISPATCH,
- DELAY,
- RUNNING,
- RESULT,
- WORKER_REJECT
+public class WorkflowEventHandleException extends Exception {
+
+ public WorkflowEventHandleException(String message) {
+ super(message);
+ }
+
+ public WorkflowEventHandleException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java
similarity index 59%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java
index 26c3a3beab..600d321566 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java
@@ -15,12 +15,17 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
-public enum Event {
- DISPATCH,
- DELAY,
- RUNNING,
- RESULT,
- WORKER_REJECT
+public interface WorkflowEventHandler {
+
+ /**
+ * Handle a workflow event,
+ *
+ * @throws WorkflowEventHandleError if this exception happen, means
the event is broken, need to drop this event.
+ * @throws WorkflowEventHandleException if this exception happen, means we
need to retry this event.
+ */
+ void handleWorkflowEvent(WorkflowEvent workflowEvent) throws
WorkflowEventHandleError, WorkflowEventHandleException;
+
+ WorkflowEventType getHandleWorkflowEventType();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
new file mode 100644
index 0000000000..86c8b90cfa
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WorkflowEventQueue {
+
+ private final Logger logger =
LoggerFactory.getLogger(WorkflowEventQueue.class);
+
+ private static final LinkedBlockingQueue<WorkflowEvent> workflowEventQueue
= new LinkedBlockingQueue<>();
+
+ /**
+ * Add a workflow event.
+ */
+ public void addEvent(WorkflowEvent workflowEvent) {
+ workflowEventQueue.add(workflowEvent);
+ logger.info("Added workflow event to workflowEvent queue, event: {}",
workflowEvent);
+ }
+
+ /**
+ * Pool the head of the workflow event queue and wait an workflow event.
+ */
+ public WorkflowEvent poolEvent() throws InterruptedException {
+ return workflowEventQueue.take();
+ }
+
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java
similarity index 85%
rename from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java
index 26c3a3beab..b0f5f09e30 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
+
+public enum WorkflowEventType {
+
+ START_WORKFLOW,
+ ;
-public enum Event {
- DISPATCH,
- DELAY,
- RUNNING,
- RESULT,
- WORKER_REJECT
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
new file mode 100644
index 0000000000..b4d9fc1f85
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import
org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatue;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WorkflowStartEventHandler implements WorkflowEventHandler {
+
+ private final Logger logger =
LoggerFactory.getLogger(WorkflowStartEventHandler.class);
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ @Autowired
+ private StateWheelExecuteThread stateWheelExecuteThread;
+
+ @Autowired
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+ @Autowired
+ private WorkflowEventQueue workflowEventQueue;
+
+ @Override
+ public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws
WorkflowEventHandleError {
+ logger.info("Handle workflow start event, begin to start a workflow,
event: {}", workflowEvent);
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId());
+ if (workflowExecuteRunnable == null) {
+ throw new WorkflowEventHandleError(
+ "The workflow start event is invalid, cannot find the workflow
instance from cache");
+ }
+ ProcessInstance processInstance =
workflowExecuteRunnable.getProcessInstance();
+
+ ProcessInstanceMetrics.incProcessInstanceSubmit();
+ CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
+ CompletableFuture.supplyAsync(workflowExecuteRunnable::call,
workflowExecuteThreadPool);
+ workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+ if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
+ // submit failed will resend the event to workflow event queue
+ logger.info("Success submit the workflow instance");
+ if (processInstance.getTimeout() > 0) {
+
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
+ }
+ } else {
+ logger.error("Failed to submit the workflow instance, will
resend the workflow start event: {}",
+ workflowEvent);
+ workflowEventQueue.addEvent(new
WorkflowEvent(WorkflowEventType.START_WORKFLOW,
+
processInstance.getId()));
+ }
+ });
+ }
+
+ @Override
+ public WorkflowEventType getHandleWorkflowEventType() {
+ return WorkflowEventType.START_WORKFLOW;
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
index 37d8ceb1da..3abdd879bb 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
@@ -41,15 +41,14 @@ public class WorkflowStateEventHandler implements
StateEventHandler {
ProcessInstance processInstance =
workflowExecuteRunnable.getProcessInstance();
ProcessDefinition processDefinition =
processInstance.getProcessDefinition();
- logger.info("process:{} state {} change to {}",
- processInstance.getId(),
- processInstance.getState(),
- stateEvent.getExecutionStatus());
+ logger.info(
+ "Handle workflow instance state event, the current workflow
instance state {} will be changed to {}",
+ processInstance.getState(), stateEvent.getExecutionStatus());
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting process
if (processDefinition.getExecutionType().typeIsSerialWait() ||
processDefinition.getExecutionType()
- .typeIsSerialPriority()) {
+
.typeIsSerialPriority()) {
workflowExecuteRunnable.endProcess();
return true;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index f90277d0eb..18afd11ae5 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.processor;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -26,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
import
org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.slf4j.Logger;
@@ -67,11 +67,12 @@ public class StateEventProcessor implements
NettyRequestProcessor {
stateEvent.setType(type);
try {
-
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId());
+
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
+ stateEvent.getTaskInstanceId());
- logger.info("Received state event change command, event: {}",
stateEvent);
- stateEventResponseService.addResponse(stateEvent);
- }finally {
+ logger.info("Received state change command, event: {}",
stateEvent);
+ stateEventResponseService.addStateChangeEvent(stateEvent);
+ } finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index bdf80bbee9..80c90ee1b7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -42,9 +42,6 @@ import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
-/**
- * task manager
- */
@Component
public class StateEventResponseService {
@@ -96,7 +93,7 @@ public class StateEventResponseService {
/**
* put task to attemptQueue
*/
- public void addResponse(StateEvent stateEvent) {
+ public void addStateChangeEvent(StateEvent stateEvent) {
try {
// check the event is validated
eventQueue.put(stateEvent);
@@ -154,6 +151,7 @@ public class StateEventResponseService {
}
WorkflowExecuteRunnable workflowExecuteThread =
this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+ // We will refresh the task instance status first, if the refresh
failed the event will not be removed
switch (stateEvent.getType()) {
case TASK_STATE_CHANGE:
workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 842bcaf333..248d253739 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
-import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
@@ -83,7 +83,7 @@ public class TaskEvent {
/**
* ack / response
*/
- private Event event;
+ private TaskEventType event;
/**
* varPool
@@ -102,7 +102,7 @@ public class TaskEvent {
event.setProcessInstanceId(processInstanceId);
event.setTaskInstanceId(taskInstanceId);
event.setWorkerAddress(workerAddress);
- event.setEvent(Event.DISPATCH);
+ event.setEvent(TaskEventType.DISPATCH);
return event;
}
@@ -116,7 +116,7 @@ public class TaskEvent {
event.setLogPath(command.getLogPath());
event.setChannel(channel);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
- event.setEvent(Event.RUNNING);
+ event.setEvent(TaskEventType.RUNNING);
return event;
}
@@ -134,7 +134,7 @@ public class TaskEvent {
event.setVarPool(command.getVarPool());
event.setChannel(channel);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
- event.setEvent(Event.RESULT);
+ event.setEvent(TaskEventType.RESULT);
return event;
}
@@ -143,7 +143,7 @@ public class TaskEvent {
event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId());
event.setChannel(channel);
- event.setEvent(Event.WORKER_REJECT);
+ event.setEvent(TaskEventType.WORKER_REJECT);
return event;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 9fc96e7564..3c05671a10 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -17,29 +17,18 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
-import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
-import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
-import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
-import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
+import
org.apache.dolphinscheduler.server.master.event.TaskEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
-import java.util.Optional;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.channel.Channel;
-
/**
* task execute thread
*/
@@ -51,34 +40,37 @@ public class TaskExecuteRunnable implements Runnable {
private final ConcurrentLinkedQueue<TaskEvent> events = new
ConcurrentLinkedQueue<>();
- private ProcessService processService;
-
- private WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
- private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
-
- private DataQualityResultOperator dataQualityResultOperator;
+ private final Map<TaskEventType, TaskEventHandler> taskEventHandlerMap;
- public TaskExecuteRunnable(int processInstanceId, ProcessService
processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
- ProcessInstanceExecCacheManager
processInstanceExecCacheManager, DataQualityResultOperator
dataQualityResultOperator) {
+ public TaskExecuteRunnable(int processInstanceId, Map<TaskEventType,
TaskEventHandler> taskEventHandlerMap) {
this.processInstanceId = processInstanceId;
- this.processService = processService;
- this.workflowExecuteThreadPool = workflowExecuteThreadPool;
- this.processInstanceExecCacheManager = processInstanceExecCacheManager;
- this.dataQualityResultOperator = dataQualityResultOperator;
+ this.taskEventHandlerMap = taskEventHandlerMap;
}
@Override
public void run() {
while (!this.events.isEmpty()) {
+ // we handle the task event belongs to one task serial, so if the
event comes in wrong order,
TaskEvent event = this.events.peek();
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(),
event.getTaskInstanceId());
- persist(event);
- } catch (Exception e) {
- logger.error("persist task event error, event:{}", event, e);
+ logger.info("Handle task event begin: {}", event);
+
taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
+ events.remove(event);
+ logger.info("Handle task event finished: {}", event);
+ } catch (TaskEventHandleException taskEventHandleException) {
+ // we don't need to resubmit this event, since the worker will
resubmit this event
+ logger.error("Handle task event failed, this event will be
retry later, event: {}", event,
+ taskEventHandleException);
+ } catch (TaskEventHandleError taskEventHandleError) {
+ logger.error("Handle task event error, this event will be
removed, event: {}", event,
+ taskEventHandleError);
+ events.remove(event);
+ } catch (Exception unknownException) {
+ logger.error("Handle task event error, get a unknown
exception, this event will be removed, event: {}",
+ event, unknownException);
+ events.remove(event);
} finally {
- this.events.remove(event);
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
@@ -109,156 +101,4 @@ public class TaskExecuteRunnable implements Runnable {
return this.events.add(event);
}
- /**
- * persist task event
- *
- * @param taskEvent taskEvent
- */
- private void persist(TaskEvent taskEvent) throws Exception {
- Event event = taskEvent.getEvent();
- int taskInstanceId = taskEvent.getTaskInstanceId();
- int processInstanceId = taskEvent.getProcessInstanceId();
-
- Optional<TaskInstance> taskInstance;
- WorkflowExecuteRunnable workflowExecuteRunnable =
-
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
- if (workflowExecuteRunnable != null &&
workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
- taskInstance =
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
- } else {
- taskInstance =
Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
- }
-
- boolean needToSendEvent = true;
- switch (event) {
- case DISPATCH:
- needToSendEvent = handleDispatchEvent(taskEvent, taskInstance);
- // dispatch event do not need to submit state event
- break;
- case DELAY:
- case RUNNING:
- needToSendEvent = handleRunningEvent(taskEvent, taskInstance);
- break;
- case RESULT:
- needToSendEvent = handleResultEvent(taskEvent, taskInstance);
- break;
- case WORKER_REJECT:
- needToSendEvent =
- handleWorkerRejectEvent(taskEvent.getChannel(),
taskInstance, workflowExecuteRunnable);
- break;
- default:
- throw new IllegalArgumentException("invalid event type : " +
event);
- }
- if (!needToSendEvent) {
- logger.info("Handle task event: {} success, there is no need to
send a StateEvent", taskEvent);
- return;
- }
-
- StateEvent stateEvent = new StateEvent();
- stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
- stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
- stateEvent.setExecutionStatus(taskEvent.getState());
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- workflowExecuteThreadPool.submitStateEvent(stateEvent);
- }
-
- /**
- * handle dispatch event
- */
- private boolean handleDispatchEvent(TaskEvent taskEvent,
Optional<TaskInstance> taskInstanceOptional) {
- if (!taskInstanceOptional.isPresent()) {
- logger.error("taskInstance is null");
- return false;
- }
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
- return false;
- }
- taskInstance.setState(ExecutionStatus.DISPATCH);
- taskInstance.setHost(taskEvent.getWorkerAddress());
- processService.saveTaskInstance(taskInstance);
- return true;
- }
-
- /**
- * handle running event
- */
- private boolean handleRunningEvent(TaskEvent taskEvent,
Optional<TaskInstance> taskInstanceOptional) {
- Channel channel = taskEvent.getChannel();
- if (taskInstanceOptional.isPresent()) {
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.getState().typeIsFinished()) {
- logger.warn("task is finish, running event is meaningless,
taskInstanceId:{}, state:{}",
- taskInstance.getId(),
- taskInstance.getState());
- return false;
- } else {
- taskInstance.setState(taskEvent.getState());
- taskInstance.setStartTime(taskEvent.getStartTime());
- taskInstance.setHost(taskEvent.getWorkerAddress());
- taskInstance.setLogPath(taskEvent.getLogPath());
- taskInstance.setExecutePath(taskEvent.getExecutePath());
- taskInstance.setPid(taskEvent.getProcessId());
- taskInstance.setAppLink(taskEvent.getAppIds());
- processService.saveTaskInstance(taskInstance);
- }
- }
- // if taskInstance is null (maybe deleted) or finish. retry will be
meaningless . so ack success
- // send ack to worker
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
- new
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
- channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
- return true;
- }
-
- /**
- * handle result event
- */
- private boolean handleResultEvent(TaskEvent taskEvent,
Optional<TaskInstance> taskInstanceOptional) {
- Channel channel = taskEvent.getChannel();
- if (taskInstanceOptional.isPresent()) {
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.getState().typeIsFinished()) {
- logger.warn("The current taskInstance has already been
finished, taskEvent: {}", taskEvent);
- return false;
- }
-
- dataQualityResultOperator.operateDqExecuteResult(taskEvent,
taskInstance);
-
- taskInstance.setStartTime(taskEvent.getStartTime());
- taskInstance.setHost(taskEvent.getWorkerAddress());
- taskInstance.setLogPath(taskEvent.getLogPath());
- taskInstance.setExecutePath(taskEvent.getExecutePath());
- taskInstance.setPid(taskEvent.getProcessId());
- taskInstance.setAppLink(taskEvent.getAppIds());
- taskInstance.setState(taskEvent.getState());
- taskInstance.setEndTime(taskEvent.getEndTime());
- taskInstance.setVarPool(taskEvent.getVarPool());
- processService.changeOutParam(taskInstance);
- processService.saveTaskInstance(taskInstance);
- }
- // if taskInstance is null (maybe deleted) . retry will be meaningless
. so response success
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
- new
TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
- channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
- return true;
- }
-
- /**
- * handle result event
- */
- private boolean handleWorkerRejectEvent(Channel channel,
- Optional<TaskInstance>
taskInstanceOptional,
- WorkflowExecuteRunnable
executeThread) throws Exception {
- TaskInstance taskInstance =
- taskInstanceOptional.orElseThrow(() -> new
RuntimeException("taskInstance is null"));
- if (executeThread != null) {
- executeThread.resubmit(taskInstance.getTaskCode());
- }
- if (channel != null) {
- TaskRecallAckCommand taskRecallAckCommand =
- new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskInstance.getId());
- channel.writeAndFlush(taskRecallAckCommand.convert2Command());
- }
- return true;
- }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index ae578d4c03..8d5f2fd3db 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
-import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
@@ -48,21 +50,10 @@ public class TaskExecuteThreadPool extends
ThreadPoolTaskExecutor {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
- /**
- * process service
- */
- @Autowired
- private ProcessService processService;
-
- /**
- * data quality result operator
- */
@Autowired
- private DataQualityResultOperator dataQualityResultOperator;
+ private List<TaskEventHandler> taskEventHandlerList;
-
- @Autowired
- private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+ private Map<TaskEventType, TaskEventHandler> taskEventHandlerMap = new
HashMap<>();
/**
* task event thread map
@@ -75,6 +66,8 @@ public class TaskExecuteThreadPool extends
ThreadPoolTaskExecutor {
this.setThreadNamePrefix("Task-Execute-Thread-");
this.setMaxPoolSize(masterConfig.getExecThreads());
this.setCorePoolSize(masterConfig.getExecThreads());
+ taskEventHandlerList.forEach(
+ taskEventHandler ->
taskEventHandlerMap.put(taskEventHandler.getHandleEventType(),
taskEventHandler));
}
public void submitTaskEvent(TaskEvent taskEvent) {
@@ -83,11 +76,7 @@ public class TaskExecuteThreadPool extends
ThreadPoolTaskExecutor {
return;
}
TaskExecuteRunnable taskExecuteRunnable =
taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
- (processInstanceId) -> new TaskExecuteRunnable(processInstanceId,
- processService,
- workflowExecuteThreadPool,
- processInstanceExecCacheManager,
- dataQualityResultOperator));
+ (processInstanceId) -> new TaskExecuteRunnable(processInstanceId,
taskEventHandlerMap));
taskExecuteRunnable.addEvent(taskEvent);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
similarity index 57%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index e88d84312c..870cc1b24c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -30,6 +30,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@@ -43,9 +46,7 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
@@ -53,18 +54,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import lombok.NonNull;
-
/**
* Master scheduler thread, this thread will consume the commands from
database and trigger processInstance executed.
*/
@Service
-public class MasterSchedulerService extends BaseDaemonThread {
+public class MasterSchedulerBootstrap extends BaseDaemonThread {
- /**
- * logger of MasterSchedulerService
- */
- private static final Logger logger =
LoggerFactory.getLogger(MasterSchedulerService.class);
+ private static final Logger logger =
LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
@Autowired
private ProcessService processService;
@@ -83,12 +79,6 @@ public class MasterSchedulerService extends BaseDaemonThread
{
*/
private ThreadPoolExecutor masterPrepareExecService;
- /**
- * workflow exec service
- */
- @Autowired
- private WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@@ -98,13 +88,15 @@ public class MasterSchedulerService extends
BaseDaemonThread {
@Autowired
private CuringParamsService curingGlobalParamsService;
- private final LinkedBlockingQueue<ProcessInstance>
submitFailedProcessInstances = new LinkedBlockingQueue<>();
+ @Autowired
+ private WorkflowEventQueue workflowEventQueue;
- private Thread failedProcessInstanceResubmitThread;
+ @Autowired
+ private WorkflowEventLooper workflowEventLooper;
private String masterAddress;
- protected MasterSchedulerService() {
+ protected MasterSchedulerBootstrap() {
super("MasterCommandLoopThread");
}
@@ -114,23 +106,19 @@ public class MasterSchedulerService extends
BaseDaemonThread {
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor)
ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread",
masterConfig.getPreExecThreads());
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
- this.failedProcessInstanceResubmitThread = new
FailedProcessInstanceResubmitThread(submitFailedProcessInstances);
-
ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size);
}
@Override
public synchronized void start() {
- logger.info("Master schedule service starting..");
+ logger.info("Master schedule bootstrap starting..");
super.start();
- this.failedProcessInstanceResubmitThread.start();
- logger.info("Master schedule service started...");
+ workflowEventLooper.start();
+ logger.info("Master schedule bootstrap started...");
}
public void close() {
- logger.info("Master schedule service stopping...");
- // these process instances will be failover, so we can safa clear here
- submitFailedProcessInstances.clear();
- logger.info("Master schedule service stopped...");
+ logger.info("Master schedule bootstrap stopping...");
+ logger.info("Master schedule bootstrap stopped...");
}
/**
@@ -140,15 +128,51 @@ public class MasterSchedulerService extends
BaseDaemonThread {
public void run() {
while (Stopper.isRunning()) {
try {
- boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory());
+ // todo: if the workflow event queue is much, we need to
handle the back pressure
+ boolean isOverload =
+ OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory());
if (isOverload) {
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
- scheduleWorkflow();
+ List<Command> commands = findCommands();
+ if (CollectionUtils.isEmpty(commands)) {
+ // indicate that no command ,sleep for 1s
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ continue;
+ }
+
+ List<ProcessInstance> processInstances =
command2ProcessInstance(commands);
+ if (CollectionUtils.isEmpty(processInstances)) {
+ // indicate that the command transform to processInstance
error, sleep for 1s
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ continue;
+ }
+ MasterServerMetrics.incMasterConsumeCommand(commands.size());
+
+ processInstances.forEach(processInstance -> {
+ try {
+
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+ if
(processInstanceExecCacheManager.contains(processInstance.getId())) {
+ logger.error("The workflow instance is already
been cached, this case shouldn't be happened");
+ }
+ WorkflowExecuteRunnable workflowRunnable = new
WorkflowExecuteRunnable(processInstance,
+
processService,
+
nettyExecutorManager,
+
processAlertManager,
+
masterConfig,
+
stateWheelExecuteThread,
+
curingGlobalParamsService);
+
processInstanceExecCacheManager.cache(processInstance.getId(),
workflowRunnable);
+ workflowEventQueue.addEvent(new
WorkflowEvent(WorkflowEventType.START_WORKFLOW,
+
processInstance.getId()));
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+ });
} catch (InterruptedException interruptedException) {
- logger.warn("Master schedule service interrupted, close the
loop", interruptedException);
+ logger.warn("Master schedule bootstrap interrupted, close the
loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
@@ -159,72 +183,9 @@ public class MasterSchedulerService extends
BaseDaemonThread {
}
}
- /**
- * Query command from database by slot, and transform to workflow
instance, then submit to workflowExecuteThreadPool.
- */
- private void scheduleWorkflow() throws InterruptedException,
MasterException {
- List<Command> commands = findCommands();
- if (CollectionUtils.isEmpty(commands)) {
- // indicate that no command ,sleep for 1s
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- return;
- }
-
- List<ProcessInstance> processInstances =
command2ProcessInstance(commands);
- if (CollectionUtils.isEmpty(processInstances)) {
- // indicate that the command transform to processInstance error,
sleep for 1s
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- return;
- }
- MasterServerMetrics.incMasterConsumeCommand(commands.size());
-
- for (ProcessInstance processInstance : processInstances) {
- submitProcessInstance(processInstance);
- }
- }
-
- private void submitProcessInstance(@NonNull ProcessInstance
processInstance) {
- try {
- LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
- logger.info("Master schedule service starting workflow instance");
- final WorkflowExecuteRunnable workflowExecuteRunnable = new
WorkflowExecuteRunnable(
- processInstance
- , processService
- , nettyExecutorManager
- , processAlertManager
- , masterConfig
- , stateWheelExecuteThread
- , curingGlobalParamsService);
-
-
this.processInstanceExecCacheManager.cache(processInstance.getId(),
workflowExecuteRunnable);
- if (processInstance.getTimeout() > 0) {
-
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
- }
- ProcessInstanceMetrics.incProcessInstanceSubmit();
- CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
CompletableFuture.supplyAsync(
- workflowExecuteRunnable::call, workflowExecuteThreadPool);
- workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
- if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
- // submit failed
-
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
- submitFailedProcessInstances.add(processInstance);
- }
- });
- logger.info("Master schedule service started workflow instance");
-
- } catch (Exception ex) {
-
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
- logger.info("Master submit workflow to thread pool failed, will
remove workflow runnable from cache manager", ex);
- } finally {
- LoggerUtils.removeWorkflowInstanceIdMDC();
- }
- }
-
private List<ProcessInstance> command2ProcessInstance(List<Command>
commands) throws InterruptedException {
long commandTransformStartTime = System.currentTimeMillis();
- logger.info("Master schedule service transforming command to
ProcessInstance, commandSize: {}", commands.size());
+ logger.info("Master schedule bootstrap transforming command to
ProcessInstance, commandSize: {}", commands.size());
List<ProcessInstance> processInstances =
Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
@@ -254,7 +215,7 @@ public class MasterSchedulerService extends
BaseDaemonThread {
// make sure to finish handling command each time before next scan
latch.await();
- logger.info("Master schedule service transformed command to
ProcessInstance, commandSize: {}, processInstanceSize: {}",
+ logger.info("Master schedule bootstrap transformed command to
ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis()
- commandTransformStartTime);
return processInstances;
@@ -273,7 +234,7 @@ public class MasterSchedulerService extends
BaseDaemonThread {
int pageSize = masterConfig.getFetchCommandNum();
final List<Command> result =
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount,
thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
- logger.info("Master schedule service loop command success,
command size: {}, current slot: {}, total slot size: {}",
+ logger.info("Master schedule bootstrap loop command success,
command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() -
scheduleStartTime);
@@ -297,34 +258,4 @@ public class MasterSchedulerService extends
BaseDaemonThread {
return state;
}
- private class FailedProcessInstanceResubmitThread extends Thread {
-
- private final LinkedBlockingQueue<ProcessInstance>
submitFailedProcessInstances;
-
- public
FailedProcessInstanceResubmitThread(LinkedBlockingQueue<ProcessInstance>
submitFailedProcessInstances) {
- logger.info("Starting workflow resubmit thread");
- this.submitFailedProcessInstances = submitFailedProcessInstances;
- this.setDaemon(true);
- this.setName("SubmitFailedProcessInstanceHandleThread");
- logger.info("Started workflow resubmit thread");
- }
-
- @Override
- public void run() {
- while (Stopper.isRunning()) {
- try {
- ProcessInstance processInstance =
submitFailedProcessInstances.take();
- submitProcessInstance(processInstance);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn("SubmitFailedProcessInstanceHandleThread has
been interrupted, will return");
- break;
- }
-
- // avoid the failed-fast cause CPU higher
- ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
- }
- }
- }
-
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
new file mode 100644
index 0000000000..ee2e70bfd0
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
+import
org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleError;
+import
org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WorkflowEventLooper extends BaseDaemonThread {
+
+ private final Logger logger =
LoggerFactory.getLogger(WorkflowEventLooper.class);
+
+ @Autowired
+ private WorkflowEventQueue workflowEventQueue;
+
+ @Autowired
+ private List<WorkflowEventHandler> workflowEventHandlerList;
+
+ private final Map<WorkflowEventType, WorkflowEventHandler>
workflowEventHandlerMap = new HashMap<>();
+
+ protected WorkflowEventLooper() {
+ super("WorkflowEventLooper");
+ }
+
+ @PostConstruct
+ public void init() {
+ workflowEventHandlerList.forEach(workflowEventHandler ->
workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
+
workflowEventHandler));
+ }
+
+ @Override
+ public synchronized void start() {
+ logger.info("WorkflowEventLooper thread starting");
+ super.start();
+ logger.info("WorkflowEventLooper thread started");
+ }
+
+ public void run() {
+ WorkflowEvent workflowEvent = null;
+ while (Stopper.isRunning()) {
+ try {
+ workflowEvent = workflowEventQueue.poolEvent();
+
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
+ logger.info("Workflow event looper receive a workflow event:
{}, will handle this", workflowEvent);
+ WorkflowEventHandler workflowEventHandler =
+
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
+ workflowEventHandler.handleWorkflowEvent(workflowEvent);
+ } catch (InterruptedException e) {
+ logger.warn("WorkflowEventLooper thread is interrupted, will
close this loop", e);
+ Thread.currentThread().interrupt();
+ break;
+ } catch (WorkflowEventHandleException
workflowEventHandleException) {
+ logger.error("Handle workflow event failed, will add this
event to event queue again, event: {}",
+ workflowEvent, workflowEventHandleException);
+ workflowEventQueue.addEvent(workflowEvent);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ } catch (WorkflowEventHandleError workflowEventHandleError) {
+ logger.error("Handle workflow event error, will drop this
event, event: {}",
+ workflowEvent,
+ workflowEventHandleError);
+ } catch (Exception unknownException) {
+ logger.error(
+ "Handle workflow event failed, get a unknown exception,
will add this event to event queue again, event: {}",
+ workflowEvent, unknownException);
+ workflowEventQueue.addEvent(workflowEvent);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ } finally {
+ LoggerUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 1834e413f8..94f5f51b80 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -119,39 +119,18 @@ import lombok.NonNull;
*/
public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue>
{
- /**
- * logger of WorkflowExecuteThread
- */
private static final Logger logger =
LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
- /**
- * process service
- */
private final ProcessService processService;
- /**
- * alert manager
- */
private final ProcessAlertManager processAlertManager;
- /**
- * netty executor manager
- */
private final NettyExecutorManager nettyExecutorManager;
- /**
- * process instance
- */
private final ProcessInstance processInstance;
- /**
- * process definition
- */
private ProcessDefinition processDefinition;
- /**
- * the object of DAG
- */
private DAG<String, TaskNode, TaskNodeRelation> dag;
/**
@@ -159,10 +138,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
*/
private String key;
- /**
- * start flag, true: start nodes submit completely
- */
- private volatile boolean isStart = false;
+
+ private WorkflowRunnableStatus workflowRunnableStatus =
WorkflowRunnableStatus.CREATED;
/**
* submit failure nodes
@@ -224,7 +201,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
private final ConcurrentLinkedQueue<StateEvent> stateEvents = new
ConcurrentLinkedQueue<>();
/**
- * ready to submit task queue
+ * The StandBy task list, will be executed, need to know, the taskInstance
in this queue may doesn't have id.
*/
private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new
PeerTaskInstancePriorityQueue();
@@ -234,14 +211,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
*/
private final Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new
ConcurrentHashMap<>();
- /**
- * state wheel execute thread
- */
private final StateWheelExecuteThread stateWheelExecuteThread;
- /**
- * curing global params service
- */
private final CuringParamsService curingParamsService;
private final String masterAddress;
@@ -276,14 +247,17 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
* the process start nodes are submitted completely.
*/
public boolean isStart() {
- return this.isStart;
+ return WorkflowRunnableStatus.STARTED == workflowRunnableStatus;
}
/**
* handle event
*/
public void handleEvents() {
- if (!isStart) {
+ if (!isStart()) {
+ logger.info(
+ "The workflow instance is not started, will not handle its
state event, current state event size: {}",
+ stateEvents);
return;
}
StateEvent stateEvent = null;
@@ -387,45 +361,53 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
public void taskFinished(TaskInstance taskInstance) throws
StateEventHandleException {
- logger.info("TaskInstance finished task code:{} state:{} ",
- taskInstance.getTaskCode(),
- taskInstance.getState());
-
- activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
- stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance,
taskInstance);
- stateWheelExecuteThread.removeTask4RetryCheck(processInstance,
taskInstance);
- stateWheelExecuteThread.removeTask4StateCheck(processInstance,
taskInstance);
-
- if (taskInstance.getState().typeIsSuccess()) {
- completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
- // todo: merge the last taskInstance
- processInstance.setVarPool(taskInstance.getVarPool());
- processService.saveProcessInstance(processInstance);
- if (!processInstance.isBlocked()) {
- submitPostNode(Long.toString(taskInstance.getTaskCode()));
- }
- } else if (taskInstance.taskCanRetry() && processInstance.getState()
!= ExecutionStatus.READY_STOP) {
- // retry task
- logger.info("Retry taskInstance taskInstance state: {}",
taskInstance.getState());
- retryTaskInstance(taskInstance);
- } else if (taskInstance.getState().typeIsFailure()) {
- completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
- // There are child nodes and the failure policy is: CONTINUE
- if (processInstance.getFailureStrategy() ==
FailureStrategy.CONTINUE
- &&
DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag))
{
- submitPostNode(Long.toString(taskInstance.getTaskCode()));
- } else {
- errorTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
- if (processInstance.getFailureStrategy() ==
FailureStrategy.END) {
- killAllTasks();
+ logger.info("TaskInstance finished task code:{} state:{}",
taskInstance.getTaskCode(), taskInstance.getState());
+ try {
+
+ activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
+ stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance,
taskInstance);
+ stateWheelExecuteThread.removeTask4RetryCheck(processInstance,
taskInstance);
+ stateWheelExecuteThread.removeTask4StateCheck(processInstance,
taskInstance);
+
+ if (taskInstance.getState().typeIsSuccess()) {
+ completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ // todo: merge the last taskInstance
+ processInstance.setVarPool(taskInstance.getVarPool());
+ processService.saveProcessInstance(processInstance);
+ if (!processInstance.isBlocked()) {
+ submitPostNode(Long.toString(taskInstance.getTaskCode()));
+ }
+ } else if (taskInstance.taskCanRetry() &&
processInstance.getState() != ExecutionStatus.READY_STOP) {
+ // retry task
+ logger.info("Retry taskInstance taskInstance state: {}",
taskInstance.getState());
+ retryTaskInstance(taskInstance);
+ } else if (taskInstance.getState().typeIsFailure()) {
+ completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ // There are child nodes and the failure policy is: CONTINUE
+ if (processInstance.getFailureStrategy() ==
FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
+ Long.toString(taskInstance.getTaskCode()),
+ dag)) {
+ submitPostNode(Long.toString(taskInstance.getTaskCode()));
+ } else {
+ errorTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ if (processInstance.getFailureStrategy() ==
FailureStrategy.END) {
+ killAllTasks();
+ }
}
+ } else if (taskInstance.getState().typeIsFinished()) {
+ // todo: when the task instance type is pause, then it should
not in completeTaskMap
+ completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
}
- } else if (taskInstance.getState().typeIsFinished()) {
- // todo: when the task instance type is pause, then it should not
in completeTaskMap
- completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ logger.info("TaskInstance finished will try to update the workflow
instance state, task code:{} state:{}",
+ taskInstance.getTaskCode(),
+ taskInstance.getState());
+ this.updateProcessInstanceState();
+ } catch (Exception ex) {
+ logger.error("Task finish failed, get a exception, will remove
this taskInstance from completeTaskMap", ex);
+ // remove the task from complete map, so that we can finish in the
next time.
+ completeTaskMap.remove(taskInstance.getTaskCode());
+ throw ex;
}
-
- this.updateProcessInstanceState();
}
/**
@@ -672,17 +654,29 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
*/
@Override
public WorkflowSubmitStatue call() {
- if (this.taskInstanceMap.size() > 0 || isStart) {
- logger.warn("The workflow has already been started");
+ if (isStart()) {
+ // This case should not been happened
+ logger.warn("[WorkflowInstance-{}] The workflow has already been
started", processInstance.getId());
return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
}
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
- buildFlowDag();
- initTaskQueue();
- submitPostNode(null);
- isStart = true;
+ if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
+ buildFlowDag();
+ workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
+ logger.info("workflowStatue changed to :{}",
workflowRunnableStatus);
+ }
+ if (workflowRunnableStatus ==
WorkflowRunnableStatus.INITIALIZE_DAG) {
+ initTaskQueue();
+ workflowRunnableStatus =
WorkflowRunnableStatus.INITIALIZE_QUEUE;
+ logger.info("workflowStatue changed to :{}",
workflowRunnableStatus);
+ }
+ if (workflowRunnableStatus ==
WorkflowRunnableStatus.INITIALIZE_QUEUE) {
+ submitPostNode(null);
+ workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
+ logger.info("workflowStatue changed to :{}",
workflowRunnableStatus);
+ }
return WorkflowSubmitStatue.SUCCESS;
} catch (Exception e) {
logger.error("Start workflow error", e);
@@ -749,9 +743,6 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
* @throws Exception exception
*/
private void buildFlowDag() throws Exception {
- if (this.dag != null) {
- return;
- }
processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
@@ -886,9 +877,9 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
}
logger.info("Initialize task queue, dependFailedTaskMap: {},
completeTaskMap: {}, errorTaskMap: {}",
- dependFailedTaskMap,
- completeTaskMap,
- errorTaskMap);
+ dependFailedTaskMap,
+ completeTaskMap,
+ errorTaskMap);
}
/**
@@ -911,7 +902,11 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
- logger.error("process id:{} name:{} submit standby task id:{}
name:{} failed!", processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
+ logger.error("process id:{} name:{} submit standby task id:{}
name:{} failed!",
+ processInstance.getId(),
+ processInstance.getName(),
+ taskInstance.getId(),
+ taskInstance.getName());
return Optional.empty();
}
@@ -962,7 +957,10 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
return Optional.of(taskInstance);
} catch (Exception e) {
- logger.error("submit standby task error", e);
+ logger.error("submit standby task error, taskCode: {},
taskInstanceId: {}",
+ taskInstance.getTaskCode(),
+ taskInstance.getId(),
+ e);
return Optional.empty();
}
}
@@ -1215,9 +1213,19 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
*/
private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
- for (Integer taskInstanceId : completeTaskMap.values()) {
+ for (Map.Entry<Long, Integer> entry : completeTaskMap.entrySet()) {
+ Long taskConde = entry.getKey();
+ Integer taskInstanceId = entry.getValue();
TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
+ if (taskInstance == null) {
+ logger.warn("Cannot find the taskInstance from
taskInstanceMap, taskInstanceId: {}, taskConde: {}",
+ taskInstanceId,
+ taskConde);
+ // This case will happen when we submit to db failed, then the
taskInstanceId is 0
+ continue;
+ }
completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()),
taskInstance);
+
}
return completeTaskInstanceMap;
}
@@ -1429,6 +1437,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
*/
private boolean processFailed() {
if (hasFailedTask()) {
+ logger.info("The current process has failed task, the current
process failed");
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
return true;
}
@@ -1504,22 +1513,29 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
if (activeTaskProcessorMaps.size() > 0 || hasRetryTaskInStandBy()) {
// active task and retry task exists
- return runningState(state);
+ ExecutionStatus executionStatus = runningState(state);
+ logger.info("The workflowInstance has task running, the
workflowInstance status is {}", executionStatus);
+ return executionStatus;
}
// block
if (state == ExecutionStatus.READY_BLOCK) {
- return processReadyBlock();
+ ExecutionStatus executionStatus = processReadyBlock();
+ logger.info("The workflowInstance is ready to block, the
workflowInstance status is {}", executionStatus);
}
// waiting thread
if (hasWaitingThreadTask()) {
+ logger.info("The workflowInstance has waiting thread task, the
workflow status is {}",
+ ExecutionStatus.WAITING_THREAD);
return ExecutionStatus.WAITING_THREAD;
}
// pause
if (state == ExecutionStatus.READY_PAUSE) {
- return processReadyPause();
+ ExecutionStatus executionStatus = processReadyPause();
+ logger.info("The workflowInstance is ready to pause, the workflow
status is {}", executionStatus);
+ return executionStatus;
}
// stop
@@ -1527,15 +1543,18 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
List<TaskInstance> stopList =
getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList =
getCompleteTaskByState(ExecutionStatus.KILL);
List<TaskInstance> failList =
getCompleteTaskByState(ExecutionStatus.FAILURE);
+ ExecutionStatus executionStatus;
if (CollectionUtils.isNotEmpty(stopList) ||
CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) ||
!isComplementEnd()) {
- return ExecutionStatus.STOP;
+ executionStatus = ExecutionStatus.STOP;
} else {
- return ExecutionStatus.SUCCESS;
+ executionStatus = ExecutionStatus.SUCCESS;
}
+ logger.info("The workflowInstance is ready to stop, the workflow
status is {}", executionStatus);
}
// process failure
if (processFailed()) {
+ logger.info("The workflowInstance is failed, the workflow status
is {}", ExecutionStatus.FAILURE);
return ExecutionStatus.FAILURE;
}
@@ -1579,15 +1598,21 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
private void updateProcessInstanceState() throws StateEventHandleException
{
ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) {
+ logger.info("Update workflowInstance states, origin state: {},
target state: {}",
+ processInstance.getState(),
+ state);
updateWorkflowInstanceStatesToDB(state);
StateEvent stateEvent = new StateEvent();
stateEvent.setExecutionStatus(processInstance.getState());
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
- // this.processStateChangeHandler(stateEvent);
// replace with `stateEvents`, make sure `WorkflowExecuteThread`
can be deleted to avoid memory leaks
this.stateEvents.add(stateEvent);
+ } else {
+ logger.info("There is no need to update the workflow instance
state, origin state: {}, target state: {}",
+ processInstance.getState(),
+ state);
}
}
@@ -1602,12 +1627,9 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates)
throws StateEventHandleException {
ExecutionStatus originStates = processInstance.getState();
if (originStates != newStates) {
- logger.info("work flow process instance [id: {}, name:{}], state
change from {} to {}, cmd type: {}",
- processInstance.getId(),
- processInstance.getName(),
- originStates,
- newStates,
- processInstance.getCommandType());
+ logger.info("Begin to update workflow instance state , state will
change from {} to {}",
+ originStates,
+ newStates);
processInstance.setState(newStates);
if (newStates.typeIsFinished()) {
@@ -1657,8 +1679,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
*
* @param taskInstance task instance
*/
- private void removeTaskFromStandbyList(TaskInstance taskInstance) {
- readyToSubmitTaskQueue.remove(taskInstance);
+ private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
+ return readyToSubmitTaskQueue.remove(taskInstance);
}
/**
@@ -1748,10 +1770,20 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
// Remove and add to complete map and error map
- removeTaskFromStandbyList(task);
+ if (!removeTaskFromStandbyList(task)) {
+ logger.error(
+ "Task submit failed, remove from standby list
failed, workflowInstanceId: {}, taskCode: {}",
+ processInstance.getId(),
+ task.getTaskCode());
+ }
completeTaskMap.put(task.getTaskCode(), task.getId());
+ taskInstanceMap.put(task.getId(), task);
errorTaskMap.put(task.getTaskCode(), task.getId());
- logger.error("Task submitted failed, processInstanceId:
{}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
+ activeTaskProcessorMaps.remove(task.getTaskCode());
+ logger.error("Task submitted failed, workflowInstanceId:
{}, taskInstanceId: {}, taskCode: {}",
+ task.getProcessInstanceId(),
+ task.getId(),
+ task.getTaskCode());
} else {
removeTaskFromStandbyList(task);
}
@@ -1927,4 +1959,10 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
}
+ private enum WorkflowRunnableStatus {
+ CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
+ ;
+
+ }
+
}
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
index 3d33d8c363..0cd81dc3e2 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.remote.command;
-import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 5024a944f1..075991c955 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1284,8 +1284,8 @@ public class ProcessServiceImpl implements ProcessService
{
break;
}
logger.error(
- "task commit to db failed , taskId {} has already retry {}
times, please check the database",
- taskInstance.getId(),
+ "task commit to db failed , taskCode: {} has already retry
{} times, please check the database",
+ taskInstance.getTaskCode(),
retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
@@ -1298,6 +1298,7 @@ public class ProcessServiceImpl implements ProcessService
{
}
/**
+ * // todo: This method need to refactor, we find when the db down, but
the taskInstanceId is not 0. It's better to change to void, rather than return
TaskInstance
* submit task to db
* submit sub process to command
*
@@ -1316,9 +1317,9 @@ public class ProcessServiceImpl implements ProcessService
{
TaskInstance task = submitTaskInstanceToDB(taskInstance,
processInstance);
if (task == null) {
logger.error("Save taskInstance to db error, task name:{}, process
id:{} state: {} ",
- taskInstance.getName(),
- taskInstance.getProcessInstance(),
- processInstance.getState());
+ taskInstance.getName(),
+ taskInstance.getProcessInstance().getId(),
+ processInstance.getState());
return null;
}
@@ -1328,8 +1329,8 @@ public class ProcessServiceImpl implements ProcessService
{
logger.info(
"End save taskInstance to db successfully:{}, taskInstanceName:
{}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}",
- taskInstance.getId(),
- taskInstance.getName(),
+ task.getId(),
+ task.getName(),
task.getState(),
processInstance.getId(),
processInstance.getState());
@@ -1564,7 +1565,10 @@ public class ProcessServiceImpl implements
ProcessService {
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance,
ProcessInstance processInstance) {
ExecutionStatus processInstanceState = processInstance.getState();
if (processInstanceState.typeIsFinished() || processInstanceState ==
ExecutionStatus.READY_STOP) {
- logger.warn("processInstance {} was {}, skip submit task",
processInstance.getProcessDefinitionCode(), processInstanceState);
+ logger.warn("processInstance: {} state was: {}, skip submit this
task, taskCode: {}",
+ processInstance.getId(),
+ processInstanceState,
+ taskInstance.getTaskCode());
return null;
}
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
index fb3c84da68..f8e47fc43a 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.cache;
-import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.remote.command.Command;
import java.util.Map;
@@ -48,7 +48,7 @@ public class ResponseCache {
* @param command command
* @param event event ACK/RESULT
*/
- public void cache(Integer taskInstanceId, Command command, Event event) {
+ public void cache(Integer taskInstanceId, Command command, TaskEventType
event) {
switch (event) {
case RUNNING:
runningCache.put(taskInstanceId, command);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 3f70974344..21616a1f60 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
@@ -236,7 +236,7 @@ public class TaskCallbackService {
public void sendTaskExecuteRunningCommand(TaskExecutionContext
taskExecutionContext) {
TaskExecuteRunningCommand command =
buildTaskExecuteRunningCommand(taskExecutionContext);
// add response cache
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(),
command.convert2Command(), Event.RUNNING);
+ ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(),
command.convert2Command(), TaskEventType.RUNNING);
send(taskExecutionContext.getTaskInstanceId(),
command.convert2Command());
}
@@ -256,7 +256,7 @@ public class TaskCallbackService {
public void sendTaskExecuteResponseCommand(TaskExecutionContext
taskExecutionContext) {
TaskExecuteResponseCommand command =
buildTaskExecuteResponseCommand(taskExecutionContext);
// add response cache
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(),
command.convert2Command(), Event.RESULT);
+ ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(),
command.convert2Command(), TaskEventType.RESULT);
send(taskExecutionContext.getTaskInstanceId(),
command.convert2Command());
}
@@ -270,7 +270,7 @@ public class TaskCallbackService {
*/
public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
TaskRecallCommand taskRecallCommand =
buildRecallCommand(taskExecutionContext);
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(),
taskRecallCommand.convert2Command(), Event.WORKER_REJECT);
+ ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(),
taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT);
send(taskExecutionContext.getTaskInstanceId(),
taskRecallCommand.convert2Command());
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index f691f7c8de..73b8d84cf7 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -84,6 +84,7 @@ public class RetryReportTaskStatusThread extends
BaseDaemonThread {
private void retryRunningCommand(ResponseCache instance) {
if (!instance.getRunningCache().isEmpty()) {
Map<Integer, Command> runningCache = instance.getRunningCache();
+ logger.info("Send task running retry command starting, waiting to
retry size: {}", runningCache.size());
for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command runningCommand = entry.getValue();
@@ -93,12 +94,14 @@ public class RetryReportTaskStatusThread extends
BaseDaemonThread {
logger.error("Retry send running command to master error,
taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
}
}
+ logger.info("Send task running retry command finished, waiting to
retry size: {}", runningCache.size());
}
}
private void retryResponseCommand(ResponseCache instance) {
- if (!instance.getResponseCache().isEmpty()) {
- Map<Integer, Command> responseCache = instance.getResponseCache();
+ Map<Integer, Command> responseCache = instance.getResponseCache();
+ if (!responseCache.isEmpty()) {
+ logger.info("Send task response retry command starting, waiting to
retry size: {}", responseCache.size());
for (Map.Entry<Integer, Command> entry : responseCache.entrySet())
{
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
@@ -108,12 +111,14 @@ public class RetryReportTaskStatusThread extends
BaseDaemonThread {
logger.error("Retry send response command to master error,
taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
}
}
+ logger.info("Send task response retry command finished, waiting to
retry size: {}", responseCache.size());
}
}
private void retryRecallCommand(ResponseCache instance) {
- if (!instance.getRecallCache().isEmpty()) {
- Map<Integer, Command> recallCache = instance.getRecallCache();
+ Map<Integer, Command> recallCache = instance.getRecallCache();
+ if (!recallCache.isEmpty()) {
+ logger.info("Send task recall retry command starting, waiting to
retry size: {}", recallCache.size());
for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
@@ -123,6 +128,7 @@ public class RetryReportTaskStatusThread extends
BaseDaemonThread {
logger.error("Retry send recall command to master error,
taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
}
}
+ logger.info("Send task recall retry command finished, waiting to
retry size: {}", recallCache.size());
}
}
}