This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.1.9-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.9-prepare by this push:
     new 7a38b87c9a fix duplicate event (#14986)
7a38b87c9a is described below

commit 7a38b87c9a9097465ff5966ebc43cf55f4bdf75c
Author: caishunfeng <[email protected]>
AuthorDate: Sat Oct 7 22:26:17 2023 +0800

    fix duplicate event (#14986)
---
 .../master/runner/StateWheelExecuteThread.java     | 20 +++++++----
 .../master/runner/WorkflowExecuteRunnable.java     | 41 ++++++++++++++++------
 .../master/runner/WorkflowExecuteThreadPool.java   | 25 ++++++++++---
 3 files changed, 63 insertions(+), 23 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 04d4dc5621..5f526115d5 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -17,12 +17,11 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import lombok.NonNull;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -35,16 +34,20 @@ import 
org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
 import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.annotation.PostConstruct;
+
+import lombok.NonNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 /**
  * Check thread
  * 1. timeout task check
@@ -401,7 +404,10 @@ public class StateWheelExecuteThread extends 
BaseDaemonThread {
                 .type(StateEventType.TASK_STATE_CHANGE)
                 .status(TaskExecutionStatus.RUNNING_EXECUTION)
                 .build();
-        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+        // will skip submit check event if existed, avoid event stacking
+        if (!workflowExecuteThreadPool.existStateEvent(stateEvent)) {
+            workflowExecuteThreadPool.submitStateEvent(stateEvent);
+        }
     }
 
     private void addProcessStopEvent(ProcessInstance processInstance) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 8bbd0bb064..50bfc2099c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -88,6 +88,7 @@ import 
org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 import org.apache.dolphinscheduler.service.utils.DagHelper;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
@@ -291,19 +292,19 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             } catch (StateEventHandleError stateEventHandleError) {
                 logger.error("State event handle error, will remove this 
event: {}", stateEvent, stateEventHandleError);
                 this.stateEvents.remove(stateEvent);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
             } catch (StateEventHandleException stateEventHandleException) {
                 logger.error("State event handle error, will retry this event: 
{}",
                         stateEvent,
                         stateEventHandleException);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
             } catch (Exception e) {
                 // we catch the exception here, since if the state event 
handle failed, the state event will still keep
                 // in the stateEvents queue.
                 logger.error("State event handle error, get a unknown 
exception, will retry this event: {}",
                         stateEvent,
                         e);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
@@ -322,6 +323,18 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
         return key;
     }
 
+    public boolean existStateEvent(StateEvent stateEvent) {
+        if (CollectionUtils.isNotEmpty(this.stateEvents)) {
+            Optional<StateEvent> optional = this.stateEvents.stream()
+                    .filter(e -> e.getProcessInstanceId() == 
stateEvent.getProcessInstanceId()
+                            && Objects.equals(e.getTaskInstanceId(), 
stateEvent.getTaskInstanceId())
+                            && e.getType() == stateEvent.getType())
+                    .findFirst();
+            return optional.isPresent();
+        }
+        return false;
+    }
+
     public boolean addStateEvent(StateEvent stateEvent) {
         if (processInstance.getId() != stateEvent.getProcessInstanceId()) {
             logger.info("state event would be abounded :{}", stateEvent);
@@ -606,7 +619,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
         Date scheduleDate = processInstance.getScheduleTime();
         if (scheduleDate == null) {
             if (CollectionUtils.isEmpty(complementListDate)) {
-                logger.info("complementListDate is empty, process complement 
end. process id:{}", processInstance.getId());
+                logger.info("complementListDate is empty, process complement 
end. process id:{}",
+                        processInstance.getId());
 
                 return true;
             }
@@ -831,7 +845,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                             task.getTaskCode(),
                             task.getState());
                     if (validTaskMap.containsKey(task.getTaskCode())) {
-                        logger.warn("Have same taskCode taskInstance when init 
task queue, need to check taskExecutionStatus, taskCode:{}",
+                        logger.warn(
+                                "Have same taskCode taskInstance when init 
task queue, need to check taskExecutionStatus, taskCode:{}",
                                 task.getTaskCode());
                         int oldTaskInstanceId = 
validTaskMap.get(task.getTaskCode());
                         TaskInstance oldTaskInstance = 
taskInstanceMap.get(oldTaskInstanceId);
@@ -980,7 +995,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                         taskInstance.getProcessInstanceId(),
                         taskInstance.getTaskGroupPriority());
                 if (!acquireTaskGroup) {
-                    logger.info("Submitted task will not be dispatch right now 
because the first time to try to acquire" +
+                    logger.info(
+                            "Submitted task will not be dispatch right now 
because the first time to try to acquire" +
                                     " task group failed, taskInstanceName: {}, 
taskGroupId: {}",
                             taskInstance.getName(), taskGroupId);
                     return Optional.of(taskInstance);
@@ -989,7 +1005,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
 
             boolean dispatchSuccess = 
taskProcessor.action(TaskAction.DISPATCH);
             if (!dispatchSuccess) {
-                logger.error("Dispatch standby process {} task {} failed", 
processInstance.getName(), taskInstance.getName());
+                logger.error("Dispatch standby process {} task {} failed", 
processInstance.getName(),
+                        taskInstance.getName());
                 return Optional.empty();
             }
             taskProcessor.action(TaskAction.RUN);
@@ -1446,9 +1463,10 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             List<String> nextTaskList =
                     DagHelper.parseConditionTask(dependNodeName, 
skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
             if (!nextTaskList.contains(nextNodeName)) {
-                logger.info("DependTask is a condition task, and its next 
condition branch does not hava current task, " +
-                                "dependTaskCode: {}, currentTaskCode: {}", 
dependNodeName, nextNodeName
-                        );
+                logger.info(
+                        "DependTask is a condition task, and its next 
condition branch does not hava current task, " +
+                                "dependTaskCode: {}, currentTaskCode: {}",
+                        dependNodeName, nextNodeName);
                 return false;
             }
         } else {
@@ -1824,7 +1842,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                 TaskInstance retryTask = 
processService.findTaskInstanceById(task.getId());
                 if (retryTask != null && 
retryTask.getState().isForceSuccess()) {
                     task.setState(retryTask.getState());
-                    logger.info("Task {} has been forced success, put it into 
complete task list and stop retrying, taskInstanceId: {}",
+                    logger.info(
+                            "Task {} has been forced success, put it into 
complete task list and stop retrying, taskInstanceId: {}",
                             task.getName(), task.getId());
                     removeTaskFromStandbyList(task);
                     completeTaskMap.put(task.getTaskCode(), task.getId());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index e26df60244..510e27297a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import com.google.common.base.Strings;
-import lombok.NonNull;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -34,6 +32,14 @@ import 
org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
+
+import lombok.NonNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -42,9 +48,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.util.concurrent.ListenableFutureCallback;
 
-import javax.annotation.PostConstruct;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Strings;
 
 /**
  * Used to execute {@link WorkflowExecuteRunnable}.
@@ -82,6 +86,17 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
         this.setCorePoolSize(masterConfig.getExecThreads());
     }
 
+    public boolean existStateEvent(StateEvent stateEvent) {
+        WorkflowExecuteRunnable workflowExecuteThread =
+                
processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+        if (workflowExecuteThread == null) {
+            logger.warn("Submit state event error, cannot from 
workflowExecuteThread from cache manager, stateEvent:{}",
+                    stateEvent);
+            return false;
+        }
+        return workflowExecuteThread.existStateEvent(stateEvent);
+    }
+
     /**
      * submit state event
      */

Reply via email to