This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e2a9e76e9e [Fix-17355] Fix reassignWorkflowInstanceHost may failed
when no events in channel (#17372)
e2a9e76e9e is described below
commit e2a9e76e9e70328d3e7f250e31b66c2befefe77f
Author: lile <[email protected]>
AuthorDate: Tue Aug 5 13:37:17 2025 +0800
[Fix-17355] Fix reassignWorkflowInstanceHost may failed when no events in
channel (#17372)
---
.../TaskExecutorEventRemoteReporterClient.java | 58 ++++++++++++++--------
.../LogicTaskExecutorLifecycleEventReporter.java | 6 ++-
.../ITaskExecutorEventRemoteReporterClient.java | 3 +-
.../ITaskExecutorLifecycleEventReporter.java | 5 +-
.../TaskExecutorLifecycleEventRemoteReporter.java | 46 ++++++++++++++---
.../IReportableTaskExecutorLifecycleEvent.java | 10 ----
.../TaskExecutorDispatchedLifecycleEvent.java | 3 --
.../events/TaskExecutorFailedLifecycleEvent.java | 3 --
.../events/TaskExecutorKilledLifecycleEvent.java | 3 --
.../events/TaskExecutorLifecycleEventType.java | 6 +++
.../events/TaskExecutorPausedLifecycleEvent.java | 3 --
...xecutorRuntimeContextChangedLifecycleEvent.java | 3 --
.../events/TaskExecutorStartedLifecycleEvent.java | 3 --
.../events/TaskExecutorSuccessLifecycleEvent.java | 3 --
.../TaskExecutorLifecycleEventListener.java | 8 +--
.../worker/AbstractTaskExecutorWorker.java | 2 -
.../executor/PhysicalTaskEngineDelegator.java | 13 +++--
...PhysicalTaskExecutorLifecycleEventReporter.java | 6 ++-
18 files changed, 106 insertions(+), 78 deletions(-)
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
index 955e44c662..cec9f8eab7 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
@@ -33,32 +33,39 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TaskExecutorEventRemoteReporterClient implements
ITaskExecutorEventRemoteReporterClient {
- public void reportTaskExecutionEventToMaster(final
IReportableTaskExecutorLifecycleEvent taskExecutorLifecycleEvent) {
+ @Override
+ public void reportTaskExecutionEventToMaster(final String masterAddress,
+ final
IReportableTaskExecutorLifecycleEvent taskExecutorLifecycleEvent) {
try {
taskExecutorLifecycleEvent.setLatestReportTime(System.currentTimeMillis());
switch (taskExecutorLifecycleEvent.getType()) {
case DISPATCHED:
- reportTaskDispatchedEventToMaster(
+ reportTaskDispatchedEventToMaster(masterAddress,
(TaskExecutorDispatchedLifecycleEvent)
taskExecutorLifecycleEvent);
break;
case RUNNING:
-
reportTaskRunningEventToMaster((TaskExecutorStartedLifecycleEvent)
taskExecutorLifecycleEvent);
+ reportTaskRunningEventToMaster(masterAddress,
+ (TaskExecutorStartedLifecycleEvent)
taskExecutorLifecycleEvent);
break;
case RUNTIME_CONTEXT_CHANGE:
- reportTaskRuntimeContextChangeEventToMaster(
+ reportTaskRuntimeContextChangeEventToMaster(masterAddress,
(TaskExecutorRuntimeContextChangedLifecycleEvent)
taskExecutorLifecycleEvent);
break;
case PAUSED:
-
reportTaskPausedEventToMaster((TaskExecutorPausedLifecycleEvent)
taskExecutorLifecycleEvent);
+ reportTaskPausedEventToMaster(masterAddress,
+ (TaskExecutorPausedLifecycleEvent)
taskExecutorLifecycleEvent);
break;
case KILLED:
-
reportTaskKilledEventToMaster((TaskExecutorKilledLifecycleEvent)
taskExecutorLifecycleEvent);
+ reportTaskKilledEventToMaster(masterAddress,
+ (TaskExecutorKilledLifecycleEvent)
taskExecutorLifecycleEvent);
break;
case FAILED:
-
reportTaskFailedEventToMaster((TaskExecutorFailedLifecycleEvent)
taskExecutorLifecycleEvent);
+ reportTaskFailedEventToMaster(masterAddress,
+ (TaskExecutorFailedLifecycleEvent)
taskExecutorLifecycleEvent);
break;
case SUCCESS:
-
reportTaskSuccessEventToMaster((TaskExecutorSuccessLifecycleEvent)
taskExecutorLifecycleEvent);
+ reportTaskSuccessEventToMaster(masterAddress,
+ (TaskExecutorSuccessLifecycleEvent)
taskExecutorLifecycleEvent);
break;
default:
log.warn("Unsupported TaskExecutionEvent: {}",
taskExecutorLifecycleEvent);
@@ -69,52 +76,59 @@ public class TaskExecutorEventRemoteReporterClient
implements ITaskExecutorEvent
}
}
- private static void reportTaskDispatchedEventToMaster(final
TaskExecutorDispatchedLifecycleEvent taskExecutionDispatchedEvent) {
+ private static void reportTaskDispatchedEventToMaster(final String
masterAddress,
+ final
TaskExecutorDispatchedLifecycleEvent taskExecutionDispatchedEvent) {
Clients
.withService(ITaskExecutorEventListener.class)
-
.withHost(taskExecutionDispatchedEvent.getWorkflowInstanceHost())
+ .withHost(masterAddress)
.onTaskExecutorDispatched(taskExecutionDispatchedEvent);
}
- private static void reportTaskRunningEventToMaster(final
TaskExecutorStartedLifecycleEvent taskExecutionRunningEvent) {
+ private static void reportTaskRunningEventToMaster(final String
masterAddress,
+ final
TaskExecutorStartedLifecycleEvent taskExecutionRunningEvent) {
Clients
.withService(ITaskExecutorEventListener.class)
- .withHost(taskExecutionRunningEvent.getWorkflowInstanceHost())
+ .withHost(masterAddress)
.onTaskExecutorRunning(taskExecutionRunningEvent);
}
- private static void reportTaskRuntimeContextChangeEventToMaster(final
TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorLifecycleEvent) {
+ private static void reportTaskRuntimeContextChangeEventToMaster(final
String masterAddress,
+ final
TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorLifecycleEvent) {
Clients
.withService(ITaskExecutorEventListener.class)
- .withHost(taskExecutorLifecycleEvent.getWorkflowInstanceHost())
+ .withHost(masterAddress)
.onTaskExecutorRuntimeContextChanged(taskExecutorLifecycleEvent);
}
- private static void reportTaskPausedEventToMaster(final
TaskExecutorPausedLifecycleEvent taskExecutionPausedEvent) {
+ private static void reportTaskPausedEventToMaster(final String
masterAddress,
+ final
TaskExecutorPausedLifecycleEvent taskExecutionPausedEvent) {
Clients
.withService(ITaskExecutorEventListener.class)
- .withHost(taskExecutionPausedEvent.getWorkflowInstanceHost())
+ .withHost(masterAddress)
.onTaskExecutorPaused(taskExecutionPausedEvent);
}
- private static void reportTaskKilledEventToMaster(final
TaskExecutorKilledLifecycleEvent taskExecutionKilledEvent) {
+ private static void reportTaskKilledEventToMaster(final String
masterAddress,
+ final
TaskExecutorKilledLifecycleEvent taskExecutionKilledEvent) {
Clients
.withService(ITaskExecutorEventListener.class)
- .withHost(taskExecutionKilledEvent.getWorkflowInstanceHost())
+ .withHost(masterAddress)
.onTaskExecutorKilled(taskExecutionKilledEvent);
}
- private static void reportTaskFailedEventToMaster(final
TaskExecutorFailedLifecycleEvent taskExecutionFailedEvent) {
+ private static void reportTaskFailedEventToMaster(final String
masterAddress,
+ final
TaskExecutorFailedLifecycleEvent taskExecutionFailedEvent) {
Clients
.withService(ITaskExecutorEventListener.class)
- .withHost(taskExecutionFailedEvent.getWorkflowInstanceHost())
+ .withHost(masterAddress)
.onTaskExecutorFailed(taskExecutionFailedEvent);
}
- private static void reportTaskSuccessEventToMaster(final
TaskExecutorSuccessLifecycleEvent taskExecutionSuccessEvent) {
+ private static void reportTaskSuccessEventToMaster(final String
masterAddress,
+ final
TaskExecutorSuccessLifecycleEvent taskExecutionSuccessEvent) {
Clients
.withService(ITaskExecutorEventListener.class)
- .withHost(taskExecutionSuccessEvent.getWorkflowInstanceHost())
+ .withHost(masterAddress)
.onTaskExecutorSuccess(taskExecutionSuccessEvent);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java
index 41a80ba716..fd2a32313a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java
@@ -25,7 +25,9 @@ import org.springframework.stereotype.Component;
public class LogicTaskExecutorLifecycleEventReporter extends
TaskExecutorLifecycleEventRemoteReporter {
public LogicTaskExecutorLifecycleEventReporter(
- final
LogicTaskExecutorEventRemoteReporterClient
logicTaskExecutorEventRemoteReporterClient) {
- super("LogicTaskExecutorLifecycleEventReporter",
logicTaskExecutorEventRemoteReporterClient);
+ final
LogicTaskExecutorEventRemoteReporterClient
logicTaskExecutorEventRemoteReporterClient,
+ final
LogicTaskExecutorRepository taskExecutorRepository) {
+ super("LogicTaskExecutorLifecycleEventReporter",
logicTaskExecutorEventRemoteReporterClient,
+ taskExecutorRepository);
}
}
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java
index b7888df626..e7bfb866d8 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java
@@ -21,5 +21,6 @@ import
org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorL
public interface ITaskExecutorEventRemoteReporterClient {
- void reportTaskExecutionEventToMaster(final
IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent);
+ void reportTaskExecutionEventToMaster(final String masterAddress,
+ final
IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent);
}
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java
index 0eed5359c3..94e5737ca9 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java
@@ -48,10 +48,9 @@ public interface ITaskExecutorLifecycleEventReporter extends
AutoCloseable {
void receiveTaskExecutorLifecycleEventACK(final
TaskExecutorLifecycleEventAck taskExecutorLifecycleEventAck);
/**
- * Reassign the workflow instance host of the
IReportableTaskExecutorLifecycleEvent.
- * <p> This method is used to reassign the workflow instance host of the
IReportableTaskExecutorLifecycleEvent, once the workflow's host changed.
+ * Reset the events in the channel to allow them to be reported
immediately.
*/
- boolean reassignWorkflowInstanceHost(int taskInstanceId, String
workflowHost);
+ void onWorkflowInstanceHostChanged(int taskInstanceId);
/**
* Shutdown the reporter.
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java
index b5187c8399..d83de7f98d 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java
@@ -17,13 +17,18 @@
package org.apache.dolphinscheduler.task.executor.eventbus;
+import org.apache.dolphinscheduler.common.exception.BaseException;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
+import org.apache.dolphinscheduler.task.executor.ITaskExecutorRepository;
import
org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorLifecycleEvent;
+import
org.apache.dolphinscheduler.task.executor.events.TaskExecutorFinalizeLifecycleEvent;
import
org.apache.dolphinscheduler.task.executor.events.TaskExecutorLifecycleEventType;
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -56,11 +61,15 @@ public class TaskExecutorLifecycleEventRemoteReporter
extends BaseDaemonThread
private final Condition taskExecutionEventEmptyCondition =
eventChannelsLock.newCondition();
+ private final ITaskExecutorRepository taskExecutorRepository;
+
public TaskExecutorLifecycleEventRemoteReporter(final String reporterName,
- final
ITaskExecutorEventRemoteReporterClient taskExecutorEventRemoteReporterClient) {
+ final
ITaskExecutorEventRemoteReporterClient taskExecutorEventRemoteReporterClient,
+ final
ITaskExecutorRepository taskExecutorRepository) {
super(reporterName);
this.reporterName = reporterName;
this.taskExecutorEventRemoteReporterClient =
taskExecutorEventRemoteReporterClient;
+ this.taskExecutorRepository = taskExecutorRepository;
}
@Override
@@ -128,6 +137,11 @@ public class TaskExecutorLifecycleEventRemoteReporter
extends BaseDaemonThread
log.info("Failed removed ReportableTaskExecutorLifecycleEvent
by ack: {}", eventAck);
}
if (eventChannel.isEmpty()) {
+ // Extend the lifecycle of the TaskExecutor to span the entire
processing cycle of the task.
+ // so we can finalize the TaskExecutor after the associated
channel has been removed.
+ if (removed != null && removed.getType().isFinished()) {
+ finalizeTaskExecutor(removed.getTaskInstanceId());
+ }
eventChannels.remove(taskExecutorId);
log.debug("Removed
ReportableTaskExecutorLifecycleEventChannel: {}", taskExecutorId);
}
@@ -138,15 +152,14 @@ public class TaskExecutorLifecycleEventRemoteReporter
extends BaseDaemonThread
}
@Override
- public boolean reassignWorkflowInstanceHost(int taskInstanceId, String
workflowHost) {
+ public void onWorkflowInstanceHostChanged(int taskInstanceId) {
eventChannelsLock.lock();
try {
final ReportableTaskExecutorLifecycleEventChannel eventChannel =
eventChannels.get(taskInstanceId);
- if (eventChannel == null) {
- return false;
+ if (eventChannel != null) {
+ eventChannel.taskExecutionEventsQueue.forEach(event ->
event.setLatestReportTime(null));
+ taskExecutionEventEmptyCondition.signalAll();
}
- eventChannel.taskExecutionEventsQueue.forEach(event ->
event.setWorkflowInstanceHost(workflowHost));
- return true;
} finally {
eventChannelsLock.unlock();
}
@@ -164,6 +177,16 @@ public class TaskExecutorLifecycleEventRemoteReporter
extends BaseDaemonThread
return eventChannels;
}
+ private void finalizeTaskExecutor(final Integer taskExecutorId) {
+ final Optional<ITaskExecutor> taskExecutorOptional =
taskExecutorRepository.get(taskExecutorId);
+ if (taskExecutorOptional.isPresent()) {
+ taskExecutorOptional.get().getTaskExecutorEventBus()
+
.publish(TaskExecutorFinalizeLifecycleEvent.of(taskExecutorOptional.get()));
+ } else {
+ log.warn("TaskExecutor is not exists: {}", taskExecutorId);
+ }
+ }
+
private void handleTaskExecutionEventChannel(final
ReportableTaskExecutorLifecycleEventChannel
reportableTaskExecutorLifecycleEventChannel) {
if (reportableTaskExecutorLifecycleEventChannel.isEmpty()) {
return;
@@ -175,7 +198,16 @@ public class TaskExecutorLifecycleEventRemoteReporter
extends BaseDaemonThread
TaskExecutorMDCUtils.logWithMDC(headEvent.getTaskInstanceId())) {
try {
if (isTaskExecutorEventNeverSent(headEvent) ||
isRetryIntervalExceeded(headEvent)) {
-
taskExecutorEventRemoteReporterClient.reportTaskExecutionEventToMaster(headEvent);
+ final Optional<ITaskExecutor> taskExecutorOptional =
+
taskExecutorRepository.get(headEvent.getTaskInstanceId());
+ if (!taskExecutorOptional.isPresent()) {
+ throw new BaseException(String.format("The
TaskExecutor id %d is not exist.",
+ headEvent.getTaskInstanceId()));
+ }
+ final String masterAddress =
+
taskExecutorOptional.get().getTaskExecutionContext().getWorkflowInstanceHost();
+
taskExecutorEventRemoteReporterClient.reportTaskExecutionEventToMaster(masterAddress,
+ headEvent);
continue;
}
if (log.isDebugEnabled()) {
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java
index 795945c7f2..567927cc92 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java
@@ -29,16 +29,6 @@ public interface IReportableTaskExecutorLifecycleEvent
extends ITaskExecutorLife
*/
int getWorkflowInstanceId();
- /**
- * The host of the workflow instance which the event should report to.
- */
- String getWorkflowInstanceHost();
-
- /**
- * Set the host of the workflow instance which the event should report to.
- */
- void setWorkflowInstanceHost(String workflowInstanceHost);
-
/**
* Get the latest report time of the event, if the event is never
reported, return null.
*/
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java
index e7daf61e15..9424d3bb27 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorDispatchedLifecycleEvent extends
AbstractTaskExecutorLi
private int workflowInstanceId;
- private String workflowInstanceHost;
-
private String taskInstanceHost;
private Long latestReportTime;
@@ -49,7 +47,6 @@ public class TaskExecutorDispatchedLifecycleEvent extends
AbstractTaskExecutorLi
.taskInstanceId(taskExecutionContext.getTaskInstanceId())
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
.taskInstanceHost(taskExecutionContext.getHost())
-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
.type(TaskExecutorLifecycleEventType.DISPATCHED)
.build();
}
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java
index a77795de3a..493853b223 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorFailedLifecycleEvent extends
AbstractTaskExecutorLifecy
private int workflowInstanceId;
- private String workflowInstanceHost;
-
private String taskInstanceHost;
private String appIds;
@@ -53,7 +51,6 @@ public class TaskExecutorFailedLifecycleEvent extends
AbstractTaskExecutorLifecy
.taskInstanceId(taskExecutor.getId())
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
.taskInstanceHost(taskExecutionContext.getHost())
-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
.appIds(taskExecutionContext.getAppIds())
.endTime(taskExecutionContext.getEndTime())
.type(TaskExecutorLifecycleEventType.FAILED)
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java
index d587aca68b..606effc03e 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorKilledLifecycleEvent extends
AbstractTaskExecutorLifecy
private int workflowInstanceId;
- private String workflowInstanceHost;
-
private String taskInstanceHost;
private long endTime;
@@ -51,7 +49,6 @@ public class TaskExecutorKilledLifecycleEvent extends
AbstractTaskExecutorLifecy
.taskInstanceId(taskExecutionContext.getTaskInstanceId())
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
.taskInstanceHost(taskExecutionContext.getHost())
-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
.endTime(taskExecutionContext.getEndTime())
.type(TaskExecutorLifecycleEventType.KILLED)
.build();
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java
index 88fa77835a..aa0aa2cd4e 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java
@@ -40,4 +40,10 @@ public enum TaskExecutorLifecycleEventType {
FINALIZE,
;
+ public boolean isFinished() {
+ return (this == KILLED
+ || this == PAUSED
+ || this == FAILED
+ || this == SUCCESS);
+ }
}
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorPausedLifecycleEvent.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorPausedLifecycleEvent.java
index 0057fb95d8..c81699148e 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorPausedLifecycleEvent.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorPausedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorPausedLifecycleEvent extends
AbstractTaskExecutorLifecy
private int workflowInstanceId;
- private String workflowInstanceHost;
-
private String taskInstanceHost;
private long endTime;
@@ -51,7 +49,6 @@ public class TaskExecutorPausedLifecycleEvent extends
AbstractTaskExecutorLifecy
.taskInstanceId(taskExecutionContext.getTaskInstanceId())
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
.taskInstanceHost(taskExecutionContext.getHost())
-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
.endTime(taskExecutionContext.getEndTime())
.type(TaskExecutorLifecycleEventType.PAUSED)
.build();
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorRuntimeContextChangedLifecycleEvent.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorRuntimeContextChangedLifecycleEvent.java
index 1febc2aea6..f759e0da9e 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorRuntimeContextChangedLifecycleEvent.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorRuntimeContextChangedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorRuntimeContextChangedLifecycleEvent
extends AbstractTas
private int workflowInstanceId;
- private String workflowInstanceHost;
-
private String taskInstanceHost;
@Deprecated
@@ -53,7 +51,6 @@ public class TaskExecutorRuntimeContextChangedLifecycleEvent
extends AbstractTas
return TaskExecutorRuntimeContextChangedLifecycleEvent.builder()
.taskInstanceId(taskExecutionContext.getTaskInstanceId())
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
.taskInstanceHost(taskExecutionContext.getHost())
.processId(taskExecutionContext.getProcessId())
.appIds(taskExecutionContext.getAppIds())
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java
index 6045f93e90..94126bb39d 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorStartedLifecycleEvent extends
AbstractTaskExecutorLifec
private int workflowInstanceId;
- private String workflowInstanceHost;
-
private String taskInstanceHost;
private long startTime;
@@ -57,7 +55,6 @@ public class TaskExecutorStartedLifecycleEvent extends
AbstractTaskExecutorLifec
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
.taskInstanceHost(taskExecutionContext.getHost())
.startTime(taskExecutor.getTaskExecutionContext().getStartTime())
-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
.logPath(taskExecutionContext.getLogPath())
.executePath(taskExecutionContext.getExecutePath())
.type(TaskExecutorLifecycleEventType.RUNNING)
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorSuccessLifecycleEvent.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorSuccessLifecycleEvent.java
index 92a9e66e77..947d1edf36 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorSuccessLifecycleEvent.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorSuccessLifecycleEvent.java
@@ -40,8 +40,6 @@ public class TaskExecutorSuccessLifecycleEvent extends
AbstractTaskExecutorLifec
private int workflowInstanceId;
- private String workflowInstanceHost;
-
private String taskInstanceHost;
private long endTime;
@@ -54,7 +52,6 @@ public class TaskExecutorSuccessLifecycleEvent extends
AbstractTaskExecutorLifec
final TaskExecutionContext taskExecutionContext =
taskExecutor.getTaskExecutionContext();
return TaskExecutorSuccessLifecycleEvent.builder()
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
.taskInstanceId(taskExecutionContext.getTaskInstanceId())
.taskInstanceHost(taskExecutionContext.getHost())
.varPool(taskExecutionContext.getVarPool())
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java
index bc5b77a266..c17afaca63 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java
@@ -79,7 +79,7 @@ public class TaskExecutorLifecycleEventListener implements
ITaskExecutorLifecycl
@Override
public void onTaskExecutorPausedLifecycleEvent(final
TaskExecutorPausedLifecycleEvent event) {
-
taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(event);
+ reportTaskExecutorLifecycleEventToMaster(event);
}
@Override
@@ -90,17 +90,17 @@ public class TaskExecutorLifecycleEventListener implements
ITaskExecutorLifecycl
@Override
public void onTaskExecutorKilledLifecycleEvent(final
TaskExecutorKilledLifecycleEvent event) {
-
taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(event);
+ reportTaskExecutorLifecycleEventToMaster(event);
}
@Override
public void onTaskExecutorSuccessLifecycleEvent(final
TaskExecutorSuccessLifecycleEvent event) {
-
taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(event);
+ reportTaskExecutorLifecycleEventToMaster(event);
}
@Override
public void
onTaskExecutorFailLifecycleEvent(TaskExecutorFailedLifecycleEvent event) {
-
taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(event);
+ reportTaskExecutorLifecycleEventToMaster(event);
}
@Override
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/worker/AbstractTaskExecutorWorker.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/worker/AbstractTaskExecutorWorker.java
index eed62d8a9e..f9dff1caff 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/worker/AbstractTaskExecutorWorker.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/worker/AbstractTaskExecutorWorker.java
@@ -22,7 +22,6 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
import org.apache.dolphinscheduler.task.executor.TaskExecutorState;
import
org.apache.dolphinscheduler.task.executor.events.TaskExecutorFailedLifecycleEvent;
-import
org.apache.dolphinscheduler.task.executor.events.TaskExecutorFinalizeLifecycleEvent;
import
org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent;
import
org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent;
import
org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent;
@@ -85,7 +84,6 @@ public abstract class AbstractTaskExecutorWorker implements
ITaskExecutorWorker
protected void onTaskExecutorFinished(final ITaskExecutor taskExecutor) {
unFireTaskExecutor(taskExecutor);
-
taskExecutor.getTaskExecutorEventBus().publish(TaskExecutorFinalizeLifecycleEvent.of(taskExecutor));
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java
index be70d0893d..67aba113e6 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.task.executor.TaskEngine;
import
org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorReassignMasterRequest;
+import java.util.Optional;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -75,10 +77,13 @@ public class PhysicalTaskEngineDelegator implements
AutoCloseable {
public boolean reassignWorkflowInstanceHost(final
TaskExecutorReassignMasterRequest taskExecutorReassignMasterRequest) {
final int taskInstanceId =
taskExecutorReassignMasterRequest.getTaskInstanceId();
final String workflowHost =
taskExecutorReassignMasterRequest.getWorkflowHost();
- // todo: Is this reassign can make sure there is no concurrent problem?
- physicalTaskExecutorRepository.get(taskInstanceId).ifPresent(
- taskExecutor ->
taskExecutor.getTaskExecutionContext().setWorkflowInstanceHost(workflowHost));
- return
physicalTaskExecutorEventReporter.reassignWorkflowInstanceHost(taskInstanceId,
workflowHost);
+ final Optional<ITaskExecutor> taskExecutorOptional =
physicalTaskExecutorRepository.get(taskInstanceId);
+ if (taskExecutorOptional.isPresent()) {
+
taskExecutorOptional.get().getTaskExecutionContext().setWorkflowInstanceHost(workflowHost);
+
physicalTaskExecutorEventReporter.onWorkflowInstanceHostChanged(taskInstanceId);
+ return true;
+ }
+ return false;
}
@Override
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorLifecycleEventReporter.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorLifecycleEventReporter.java
index 3be993adbe..fdb204b5d3 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorLifecycleEventReporter.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorLifecycleEventReporter.java
@@ -25,7 +25,9 @@ import org.springframework.stereotype.Component;
public class PhysicalTaskExecutorLifecycleEventReporter extends
TaskExecutorLifecycleEventRemoteReporter {
public PhysicalTaskExecutorLifecycleEventReporter(
- final
PhysicalTaskExecutorEventRemoteReporterClient
physicalTaskExecutorEventRemoteReporterClient) {
- super("PhysicalTaskExecutorLifecycleEventReporter",
physicalTaskExecutorEventRemoteReporterClient);
+ final
PhysicalTaskExecutorEventRemoteReporterClient
physicalTaskExecutorEventRemoteReporterClient,
+ final
PhysicalTaskExecutorRepository taskExecutorRepository) {
+ super("PhysicalTaskExecutorLifecycleEventReporter",
physicalTaskExecutorEventRemoteReporterClient,
+ taskExecutorRepository);
}
}