This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5e9f1de075 Remove state check for dependent/subProcess in 
StateWheelExecuteThread (#14242)
5e9f1de075 is described below

commit 5e9f1de075334f58145c585b3a02456e9342c114
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 31 16:39:12 2023 +0800

    Remove state check for dependent/subProcess in StateWheelExecuteThread 
(#14242)
---
 .../master/runner/StateWheelExecuteThread.java     | 76 ----------------------
 .../master/runner/WorkflowExecuteRunnable.java     |  2 -
 2 files changed, 78 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 5576f13050..5c569b4744 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -73,11 +73,6 @@ public class StateWheelExecuteThread extends 
BaseDaemonThread {
      */
     private final ConcurrentLinkedQueue<TaskInstanceKey> 
taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
 
-    /**
-     * task state check list
-     */
-    private final ConcurrentLinkedQueue<TaskInstanceKey> 
taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
-
     @Autowired
     private MasterConfig masterConfig;
 
@@ -104,7 +99,6 @@ public class StateWheelExecuteThread extends 
BaseDaemonThread {
             try {
                 checkTask4Timeout();
                 checkTask4Retry();
-                checkTask4State();
                 checkProcess4Timeout();
             } catch (Exception e) {
                 log.error("state wheel thread check error:", e);
@@ -214,30 +208,10 @@ public class StateWheelExecuteThread extends 
BaseDaemonThread {
         log.info("remove task instance from retry check list");
     }
 
-    public void addTask4StateCheck(@NonNull ProcessInstance processInstance, 
@NonNull TaskInstance taskInstance) {
-        log.info("Adding task instance into state check list");
-        TaskInstanceKey taskInstanceKey = 
TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
-        if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
-            log.warn("Task instance is already in state check list");
-            return;
-        }
-        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
-            taskInstanceStateCheckList.add(taskInstanceKey);
-            log.info("Added task instance into state check list");
-        }
-    }
-
-    public void removeTask4StateCheck(@NonNull ProcessInstance 
processInstance, @NonNull TaskInstance taskInstance) {
-        TaskInstanceKey taskInstanceKey = 
TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
-        taskInstanceStateCheckList.remove(taskInstanceKey);
-        log.info("Removed task instance from state check list");
-    }
-
     public void clearAllTasks() {
         processInstanceTimeoutCheckList.clear();
         taskInstanceTimeoutCheckList.clear();
         taskInstanceRetryCheckList.clear();
-        taskInstanceStateCheckList.clear();
     }
 
     private void checkTask4Timeout() {
@@ -352,56 +326,6 @@ public class StateWheelExecuteThread extends 
BaseDaemonThread {
         }
     }
 
-    private void checkTask4State() {
-        if (taskInstanceStateCheckList.isEmpty()) {
-            return;
-        }
-        for (TaskInstanceKey taskInstanceKey : taskInstanceStateCheckList) {
-            int processInstanceId = taskInstanceKey.getProcessInstanceId();
-            long taskCode = taskInstanceKey.getTaskCode();
-
-            try {
-                LogUtils.setTaskInstanceIdMDC(processInstanceId);
-                WorkflowExecuteRunnable workflowExecuteThread =
-                        
processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
-                if (workflowExecuteThread == null) {
-                    log.warn(
-                            "Task instance state check failed, can not find 
workflowExecuteThread from cache manager, will remove this check task");
-                    taskInstanceStateCheckList.remove(taskInstanceKey);
-                    continue;
-                }
-                Optional<TaskInstance> taskInstanceOptional =
-                        
workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
-                if (!taskInstanceOptional.isPresent()) {
-                    log.warn(
-                            "Task instance state check failed, can not find 
taskInstance from workflowExecuteThread, will remove this check event");
-                    taskInstanceStateCheckList.remove(taskInstanceKey);
-                    continue;
-                }
-                TaskInstance taskInstance = taskInstanceOptional.get();
-                if (taskInstance.getState().isFinished()) {
-                    continue;
-                }
-                addTaskStateChangeEvent(taskInstance);
-            } catch (Exception ex) {
-                log.error("Task state check error, taskInstanceKey: {}", 
taskInstanceKey, ex);
-            } finally {
-                LogUtils.removeWorkflowInstanceIdMDC();
-            }
-        }
-    }
-
-    private void addTaskStateChangeEvent(TaskInstance taskInstance) {
-        TaskStateEvent stateEvent = TaskStateEvent.builder()
-                .processInstanceId(taskInstance.getProcessInstanceId())
-                .taskInstanceId(taskInstance.getId())
-                .taskCode(taskInstance.getTaskCode())
-                .type(StateEventType.TASK_STATE_CHANGE)
-                .status(TaskExecutionStatus.RUNNING_EXECUTION)
-                .build();
-        workflowExecuteThreadPool.submitStateEvent(stateEvent);
-    }
-
     private void addProcessStopEvent(ProcessInstance processInstance) {
         WorkflowStateEvent stateEvent = WorkflowStateEvent.builder()
                 .processInstanceId(processInstance.getId())
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index c7b7a37bac..6bc43ce69f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -442,7 +442,6 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
             taskExecuteRunnableMap.remove(taskInstance.getTaskCode());
             stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, 
taskInstance);
             stateWheelExecuteThread.removeTask4RetryCheck(processInstance, 
taskInstance);
-            stateWheelExecuteThread.removeTask4StateCheck(processInstance, 
taskInstance);
 
             if (taskInstance.getState().isSuccess()) {
                 completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
@@ -1043,7 +1042,6 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
                 taskExecuteRunnable.dispatch();
 
                 stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, 
taskInstance);
-                stateWheelExecuteThread.addTask4StateCheck(processInstance, 
taskInstance);
                 return true;
             }
         } catch (Exception e) {

Reply via email to