This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.1.9-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.9-prepare by this push:
new 7a38b87c9a fix duplicate event (#14986)
7a38b87c9a is described below
commit 7a38b87c9a9097465ff5966ebc43cf55f4bdf75c
Author: caishunfeng <[email protected]>
AuthorDate: Sat Oct 7 22:26:17 2023 +0800
fix duplicate event (#14986)
---
.../master/runner/StateWheelExecuteThread.java | 20 +++++++----
.../master/runner/WorkflowExecuteRunnable.java | 41 ++++++++++++++++------
.../master/runner/WorkflowExecuteThreadPool.java | 25 ++++++++++---
3 files changed, 63 insertions(+), 23 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 04d4dc5621..5f526115d5 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.server.master.runner;
-import lombok.NonNull;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -35,16 +34,20 @@ import
org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.annotation.PostConstruct;
+
+import lombok.NonNull;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
-import javax.annotation.PostConstruct;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
/**
* Check thread
* 1. timeout task check
@@ -401,7 +404,10 @@ public class StateWheelExecuteThread extends
BaseDaemonThread {
.type(StateEventType.TASK_STATE_CHANGE)
.status(TaskExecutionStatus.RUNNING_EXECUTION)
.build();
- workflowExecuteThreadPool.submitStateEvent(stateEvent);
+ // will skip submit check event if existed, avoid event stacking
+ if (!workflowExecuteThreadPool.existStateEvent(stateEvent)) {
+ workflowExecuteThreadPool.submitStateEvent(stateEvent);
+ }
}
private void addProcessStopEvent(ProcessInstance processInstance) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 8bbd0bb064..50bfc2099c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -88,6 +88,7 @@ import
org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.dolphinscheduler.service.utils.DagHelper;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@@ -291,19 +292,19 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
} catch (StateEventHandleError stateEventHandleError) {
logger.error("State event handle error, will remove this
event: {}", stateEvent, stateEventHandleError);
this.stateEvents.remove(stateEvent);
- ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (StateEventHandleException stateEventHandleException) {
logger.error("State event handle error, will retry this event:
{}",
stateEvent,
stateEventHandleException);
- ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (Exception e) {
// we catch the exception here, since if the state event
handle failed, the state event will still keep
// in the stateEvents queue.
logger.error("State event handle error, get a unknown
exception, will retry this event: {}",
stateEvent,
e);
- ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
@@ -322,6 +323,18 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
return key;
}
+ public boolean existStateEvent(StateEvent stateEvent) {
+ if (CollectionUtils.isNotEmpty(this.stateEvents)) {
+ Optional<StateEvent> optional = this.stateEvents.stream()
+ .filter(e -> e.getProcessInstanceId() ==
stateEvent.getProcessInstanceId()
+ && Objects.equals(e.getTaskInstanceId(),
stateEvent.getTaskInstanceId())
+ && e.getType() == stateEvent.getType())
+ .findFirst();
+ return optional.isPresent();
+ }
+ return false;
+ }
+
public boolean addStateEvent(StateEvent stateEvent) {
if (processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.info("state event would be abounded :{}", stateEvent);
@@ -606,7 +619,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
Date scheduleDate = processInstance.getScheduleTime();
if (scheduleDate == null) {
if (CollectionUtils.isEmpty(complementListDate)) {
- logger.info("complementListDate is empty, process complement
end. process id:{}", processInstance.getId());
+ logger.info("complementListDate is empty, process complement
end. process id:{}",
+ processInstance.getId());
return true;
}
@@ -831,7 +845,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
task.getTaskCode(),
task.getState());
if (validTaskMap.containsKey(task.getTaskCode())) {
- logger.warn("Have same taskCode taskInstance when init
task queue, need to check taskExecutionStatus, taskCode:{}",
+ logger.warn(
+ "Have same taskCode taskInstance when init
task queue, need to check taskExecutionStatus, taskCode:{}",
task.getTaskCode());
int oldTaskInstanceId =
validTaskMap.get(task.getTaskCode());
TaskInstance oldTaskInstance =
taskInstanceMap.get(oldTaskInstanceId);
@@ -980,7 +995,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
- logger.info("Submitted task will not be dispatch right now
because the first time to try to acquire" +
+ logger.info(
+ "Submitted task will not be dispatch right now
because the first time to try to acquire" +
" task group failed, taskInstanceName: {},
taskGroupId: {}",
taskInstance.getName(), taskGroupId);
return Optional.of(taskInstance);
@@ -989,7 +1005,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
boolean dispatchSuccess =
taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
- logger.error("Dispatch standby process {} task {} failed",
processInstance.getName(), taskInstance.getName());
+ logger.error("Dispatch standby process {} task {} failed",
processInstance.getName(),
+ taskInstance.getName());
return Optional.empty();
}
taskProcessor.action(TaskAction.RUN);
@@ -1446,9 +1463,10 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
List<String> nextTaskList =
DagHelper.parseConditionTask(dependNodeName,
skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
if (!nextTaskList.contains(nextNodeName)) {
- logger.info("DependTask is a condition task, and its next
condition branch does not hava current task, " +
- "dependTaskCode: {}, currentTaskCode: {}",
dependNodeName, nextNodeName
- );
+ logger.info(
+ "DependTask is a condition task, and its next
condition branch does not hava current task, " +
+ "dependTaskCode: {}, currentTaskCode: {}",
+ dependNodeName, nextNodeName);
return false;
}
} else {
@@ -1824,7 +1842,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
TaskInstance retryTask =
processService.findTaskInstanceById(task.getId());
if (retryTask != null &&
retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState());
- logger.info("Task {} has been forced success, put it into
complete task list and stop retrying, taskInstanceId: {}",
+ logger.info(
+ "Task {} has been forced success, put it into
complete task list and stop retrying, taskInstanceId: {}",
task.getName(), task.getId());
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index e26df60244..510e27297a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner;
-import com.google.common.base.Strings;
-import lombok.NonNull;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -34,6 +32,14 @@ import
org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
+
+import lombok.NonNull;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,9 +48,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
-import javax.annotation.PostConstruct;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Strings;
/**
* Used to execute {@link WorkflowExecuteRunnable}.
@@ -82,6 +86,17 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
this.setCorePoolSize(masterConfig.getExecThreads());
}
+ public boolean existStateEvent(StateEvent stateEvent) {
+ WorkflowExecuteRunnable workflowExecuteThread =
+
processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+ if (workflowExecuteThread == null) {
+ logger.warn("Submit state event error, cannot from
workflowExecuteThread from cache manager, stateEvent:{}",
+ stateEvent);
+ return false;
+ }
+ return workflowExecuteThread.existStateEvent(stateEvent);
+ }
+
/**
* submit state event
*/