This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e9f1de075 Remove state check for dependent/subProcess in
StateWheelExecuteThread (#14242)
5e9f1de075 is described below
commit 5e9f1de075334f58145c585b3a02456e9342c114
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 31 16:39:12 2023 +0800
Remove state check for dependent/subProcess in StateWheelExecuteThread
(#14242)
---
.../master/runner/StateWheelExecuteThread.java | 76 ----------------------
.../master/runner/WorkflowExecuteRunnable.java | 2 -
2 files changed, 78 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 5576f13050..5c569b4744 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -73,11 +73,6 @@ public class StateWheelExecuteThread extends
BaseDaemonThread {
*/
private final ConcurrentLinkedQueue<TaskInstanceKey>
taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
- /**
- * task state check list
- */
- private final ConcurrentLinkedQueue<TaskInstanceKey>
taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
-
@Autowired
private MasterConfig masterConfig;
@@ -104,7 +99,6 @@ public class StateWheelExecuteThread extends
BaseDaemonThread {
try {
checkTask4Timeout();
checkTask4Retry();
- checkTask4State();
checkProcess4Timeout();
} catch (Exception e) {
log.error("state wheel thread check error:", e);
@@ -214,30 +208,10 @@ public class StateWheelExecuteThread extends
BaseDaemonThread {
log.info("remove task instance from retry check list");
}
- public void addTask4StateCheck(@NonNull ProcessInstance processInstance,
@NonNull TaskInstance taskInstance) {
- log.info("Adding task instance into state check list");
- TaskInstanceKey taskInstanceKey =
TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
- log.warn("Task instance is already in state check list");
- return;
- }
- if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
- taskInstanceStateCheckList.add(taskInstanceKey);
- log.info("Added task instance into state check list");
- }
- }
-
- public void removeTask4StateCheck(@NonNull ProcessInstance
processInstance, @NonNull TaskInstance taskInstance) {
- TaskInstanceKey taskInstanceKey =
TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- taskInstanceStateCheckList.remove(taskInstanceKey);
- log.info("Removed task instance from state check list");
- }
-
public void clearAllTasks() {
processInstanceTimeoutCheckList.clear();
taskInstanceTimeoutCheckList.clear();
taskInstanceRetryCheckList.clear();
- taskInstanceStateCheckList.clear();
}
private void checkTask4Timeout() {
@@ -352,56 +326,6 @@ public class StateWheelExecuteThread extends
BaseDaemonThread {
}
}
- private void checkTask4State() {
- if (taskInstanceStateCheckList.isEmpty()) {
- return;
- }
- for (TaskInstanceKey taskInstanceKey : taskInstanceStateCheckList) {
- int processInstanceId = taskInstanceKey.getProcessInstanceId();
- long taskCode = taskInstanceKey.getTaskCode();
-
- try {
- LogUtils.setTaskInstanceIdMDC(processInstanceId);
- WorkflowExecuteRunnable workflowExecuteThread =
-
processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
- if (workflowExecuteThread == null) {
- log.warn(
- "Task instance state check failed, can not find
workflowExecuteThread from cache manager, will remove this check task");
- taskInstanceStateCheckList.remove(taskInstanceKey);
- continue;
- }
- Optional<TaskInstance> taskInstanceOptional =
-
workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
- if (!taskInstanceOptional.isPresent()) {
- log.warn(
- "Task instance state check failed, can not find
taskInstance from workflowExecuteThread, will remove this check event");
- taskInstanceStateCheckList.remove(taskInstanceKey);
- continue;
- }
- TaskInstance taskInstance = taskInstanceOptional.get();
- if (taskInstance.getState().isFinished()) {
- continue;
- }
- addTaskStateChangeEvent(taskInstance);
- } catch (Exception ex) {
- log.error("Task state check error, taskInstanceKey: {}",
taskInstanceKey, ex);
- } finally {
- LogUtils.removeWorkflowInstanceIdMDC();
- }
- }
- }
-
- private void addTaskStateChangeEvent(TaskInstance taskInstance) {
- TaskStateEvent stateEvent = TaskStateEvent.builder()
- .processInstanceId(taskInstance.getProcessInstanceId())
- .taskInstanceId(taskInstance.getId())
- .taskCode(taskInstance.getTaskCode())
- .type(StateEventType.TASK_STATE_CHANGE)
- .status(TaskExecutionStatus.RUNNING_EXECUTION)
- .build();
- workflowExecuteThreadPool.submitStateEvent(stateEvent);
- }
-
private void addProcessStopEvent(ProcessInstance processInstance) {
WorkflowStateEvent stateEvent = WorkflowStateEvent.builder()
.processInstanceId(processInstance.getId())
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index c7b7a37bac..6bc43ce69f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -442,7 +442,6 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
taskExecuteRunnableMap.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance,
taskInstance);
stateWheelExecuteThread.removeTask4RetryCheck(processInstance,
taskInstance);
- stateWheelExecuteThread.removeTask4StateCheck(processInstance,
taskInstance);
if (taskInstance.getState().isSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
@@ -1043,7 +1042,6 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
taskExecuteRunnable.dispatch();
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance,
taskInstance);
- stateWheelExecuteThread.addTask4StateCheck(processInstance,
taskInstance);
return true;
}
} catch (Exception e) {