This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 3.1.9-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.9-prepare by this push:
     new 160fde19aa Revert "fix duplicate event (#14986)" (#15006)
160fde19aa is described below

commit 160fde19aa8f59278ab565f33ce10627f9d2ed09
Author: caishunfeng <[email protected]>
AuthorDate: Wed Oct 11 10:11:06 2023 +0800

    Revert "fix duplicate event (#14986)" (#15006)
    
    This reverts commit 7a38b87c9a9097465ff5966ebc43cf55f4bdf75c.
---
 .../master/runner/StateWheelExecuteThread.java     | 20 ++++-------
 .../master/runner/WorkflowExecuteRunnable.java     | 41 ++++++----------------
 .../master/runner/WorkflowExecuteThreadPool.java   | 25 +++----------
 3 files changed, 23 insertions(+), 63 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 5f526115d5..04d4dc5621 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -17,11 +17,12 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
+import lombok.NonNull;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -34,20 +35,16 @@ import 
org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
 import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
-
-import java.util.Optional;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import javax.annotation.PostConstruct;
-
-import lombok.NonNull;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 /**
  * Check thread
  * 1. timeout task check
@@ -404,10 +401,7 @@ public class StateWheelExecuteThread extends 
BaseDaemonThread {
                 .type(StateEventType.TASK_STATE_CHANGE)
                 .status(TaskExecutionStatus.RUNNING_EXECUTION)
                 .build();
-        // will skip submit check event if existed, avoid event stacking
-        if (!workflowExecuteThreadPool.existStateEvent(stateEvent)) {
-            workflowExecuteThreadPool.submitStateEvent(stateEvent);
-        }
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
     private void addProcessStopEvent(ProcessInstance processInstance) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 50bfc2099c..8bbd0bb064 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -88,7 +88,6 @@ import 
org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 import org.apache.dolphinscheduler.service.utils.DagHelper;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
-
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
@@ -292,19 +291,19 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             } catch (StateEventHandleError stateEventHandleError) {
                 logger.error("State event handle error, will remove this 
event: {}", stateEvent, stateEventHandleError);
                 this.stateEvents.remove(stateEvent);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (StateEventHandleException stateEventHandleException) {
                 logger.error("State event handle error, will retry this event: 
{}",
                         stateEvent,
                         stateEventHandleException);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
                 // we catch the exception here, since if the state event 
handle failed, the state event will still keep
                 // in the stateEvents queue.
                 logger.error("State event handle error, get a unknown 
exception, will retry this event: {}",
                         stateEvent,
                         e);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
@@ -323,18 +322,6 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
         return key;
     }
 
-    public boolean existStateEvent(StateEvent stateEvent) {
-        if (CollectionUtils.isNotEmpty(this.stateEvents)) {
-            Optional<StateEvent> optional = this.stateEvents.stream()
-                    .filter(e -> e.getProcessInstanceId() == 
stateEvent.getProcessInstanceId()
-                            && Objects.equals(e.getTaskInstanceId(), 
stateEvent.getTaskInstanceId())
-                            && e.getType() == stateEvent.getType())
-                    .findFirst();
-            return optional.isPresent();
-        }
-        return false;
-    }
-
     public boolean addStateEvent(StateEvent stateEvent) {
         if (processInstance.getId() != stateEvent.getProcessInstanceId()) {
             logger.info("state event would be abounded :{}", stateEvent);
@@ -619,8 +606,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
         Date scheduleDate = processInstance.getScheduleTime();
         if (scheduleDate == null) {
             if (CollectionUtils.isEmpty(complementListDate)) {
-                logger.info("complementListDate is empty, process complement 
end. process id:{}",
-                        processInstance.getId());
+                logger.info("complementListDate is empty, process complement 
end. process id:{}", processInstance.getId());
 
                 return true;
             }
@@ -845,8 +831,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                             task.getTaskCode(),
                             task.getState());
                     if (validTaskMap.containsKey(task.getTaskCode())) {
-                        logger.warn(
-                                "Have same taskCode taskInstance when init 
task queue, need to check taskExecutionStatus, taskCode:{}",
+                        logger.warn("Have same taskCode taskInstance when init 
task queue, need to check taskExecutionStatus, taskCode:{}",
                                 task.getTaskCode());
                         int oldTaskInstanceId = 
validTaskMap.get(task.getTaskCode());
                         TaskInstance oldTaskInstance = 
taskInstanceMap.get(oldTaskInstanceId);
@@ -995,8 +980,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                         taskInstance.getProcessInstanceId(),
                         taskInstance.getTaskGroupPriority());
                 if (!acquireTaskGroup) {
-                    logger.info(
-                            "Submitted task will not be dispatch right now 
because the first time to try to acquire" +
+                    logger.info("Submitted task will not be dispatch right now 
because the first time to try to acquire" +
                                     " task group failed, taskInstanceName: {}, 
taskGroupId: {}",
                             taskInstance.getName(), taskGroupId);
                     return Optional.of(taskInstance);
@@ -1005,8 +989,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
 
             boolean dispatchSuccess = 
taskProcessor.action(TaskAction.DISPATCH);
             if (!dispatchSuccess) {
-                logger.error("Dispatch standby process {} task {} failed", 
processInstance.getName(),
-                        taskInstance.getName());
+                logger.error("Dispatch standby process {} task {} failed", 
processInstance.getName(), taskInstance.getName());
                 return Optional.empty();
             }
             taskProcessor.action(TaskAction.RUN);
@@ -1463,10 +1446,9 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             List<String> nextTaskList =
                     DagHelper.parseConditionTask(dependNodeName, 
skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
             if (!nextTaskList.contains(nextNodeName)) {
-                logger.info(
-                        "DependTask is a condition task, and its next 
condition branch does not hava current task, " +
-                                "dependTaskCode: {}, currentTaskCode: {}",
-                        dependNodeName, nextNodeName);
+                logger.info("DependTask is a condition task, and its next 
condition branch does not hava current task, " +
+                                "dependTaskCode: {}, currentTaskCode: {}", 
dependNodeName, nextNodeName
+                        );
                 return false;
             }
         } else {
@@ -1842,8 +1824,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                 TaskInstance retryTask = 
processService.findTaskInstanceById(task.getId());
                 if (retryTask != null && 
retryTask.getState().isForceSuccess()) {
                     task.setState(retryTask.getState());
-                    logger.info(
-                            "Task {} has been forced success, put it into 
complete task list and stop retrying, taskInstanceId: {}",
+                    logger.info("Task {} has been forced success, put it into 
complete task list and stop retrying, taskInstanceId: {}",
                             task.getName(), task.getId());
                     removeTaskFromStandbyList(task);
                     completeTaskMap.put(task.getTaskCode(), task.getId());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 510e27297a..e26df60244 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
+import com.google.common.base.Strings;
+import lombok.NonNull;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -32,14 +34,6 @@ import 
org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.PostConstruct;
-
-import lombok.NonNull;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -48,7 +42,9 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.util.concurrent.ListenableFutureCallback;
 
-import com.google.common.base.Strings;
+import javax.annotation.PostConstruct;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Used to execute {@link WorkflowExecuteRunnable}.
@@ -86,17 +82,6 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
         this.setCorePoolSize(masterConfig.getExecThreads());
     }
 
-    public boolean existStateEvent(StateEvent stateEvent) {
-        WorkflowExecuteRunnable workflowExecuteThread =
-                
processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
-        if (workflowExecuteThread == null) {
-            logger.warn("Submit state event error, cannot from 
workflowExecuteThread from cache manager, stateEvent:{}",
-                    stateEvent);
-            return false;
-        }
-        return workflowExecuteThread.existStateEvent(stateEvent);
-    }
-
     /**
      * submit state event
      */

Reply via email to