This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e2a9e76e9e [Fix-17355] Fix reassignWorkflowInstanceHost may failed 
when no events in channel (#17372)
e2a9e76e9e is described below

commit e2a9e76e9e70328d3e7f250e31b66c2befefe77f
Author: lile <[email protected]>
AuthorDate: Tue Aug 5 13:37:17 2025 +0800

    [Fix-17355] Fix reassignWorkflowInstanceHost may failed when no events in 
channel (#17372)
---
 .../TaskExecutorEventRemoteReporterClient.java     | 58 ++++++++++++++--------
 .../LogicTaskExecutorLifecycleEventReporter.java   |  6 ++-
 .../ITaskExecutorEventRemoteReporterClient.java    |  3 +-
 .../ITaskExecutorLifecycleEventReporter.java       |  5 +-
 .../TaskExecutorLifecycleEventRemoteReporter.java  | 46 ++++++++++++++---
 .../IReportableTaskExecutorLifecycleEvent.java     | 10 ----
 .../TaskExecutorDispatchedLifecycleEvent.java      |  3 --
 .../events/TaskExecutorFailedLifecycleEvent.java   |  3 --
 .../events/TaskExecutorKilledLifecycleEvent.java   |  3 --
 .../events/TaskExecutorLifecycleEventType.java     |  6 +++
 .../events/TaskExecutorPausedLifecycleEvent.java   |  3 --
 ...xecutorRuntimeContextChangedLifecycleEvent.java |  3 --
 .../events/TaskExecutorStartedLifecycleEvent.java  |  3 --
 .../events/TaskExecutorSuccessLifecycleEvent.java  |  3 --
 .../TaskExecutorLifecycleEventListener.java        |  8 +--
 .../worker/AbstractTaskExecutorWorker.java         |  2 -
 .../executor/PhysicalTaskEngineDelegator.java      | 13 +++--
 ...PhysicalTaskExecutorLifecycleEventReporter.java |  6 ++-
 18 files changed, 106 insertions(+), 78 deletions(-)

diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
index 955e44c662..cec9f8eab7 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
@@ -33,32 +33,39 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class TaskExecutorEventRemoteReporterClient implements 
ITaskExecutorEventRemoteReporterClient {
 
-    public void reportTaskExecutionEventToMaster(final 
IReportableTaskExecutorLifecycleEvent taskExecutorLifecycleEvent) {
+    @Override
+    public void reportTaskExecutionEventToMaster(final String masterAddress,
+                                                 final 
IReportableTaskExecutorLifecycleEvent taskExecutorLifecycleEvent) {
         try {
             
taskExecutorLifecycleEvent.setLatestReportTime(System.currentTimeMillis());
             switch (taskExecutorLifecycleEvent.getType()) {
                 case DISPATCHED:
-                    reportTaskDispatchedEventToMaster(
+                    reportTaskDispatchedEventToMaster(masterAddress,
                             (TaskExecutorDispatchedLifecycleEvent) 
taskExecutorLifecycleEvent);
                     break;
                 case RUNNING:
-                    
reportTaskRunningEventToMaster((TaskExecutorStartedLifecycleEvent) 
taskExecutorLifecycleEvent);
+                    reportTaskRunningEventToMaster(masterAddress,
+                            (TaskExecutorStartedLifecycleEvent) 
taskExecutorLifecycleEvent);
                     break;
                 case RUNTIME_CONTEXT_CHANGE:
-                    reportTaskRuntimeContextChangeEventToMaster(
+                    reportTaskRuntimeContextChangeEventToMaster(masterAddress,
                             (TaskExecutorRuntimeContextChangedLifecycleEvent) 
taskExecutorLifecycleEvent);
                     break;
                 case PAUSED:
-                    
reportTaskPausedEventToMaster((TaskExecutorPausedLifecycleEvent) 
taskExecutorLifecycleEvent);
+                    reportTaskPausedEventToMaster(masterAddress,
+                            (TaskExecutorPausedLifecycleEvent) 
taskExecutorLifecycleEvent);
                     break;
                 case KILLED:
-                    
reportTaskKilledEventToMaster((TaskExecutorKilledLifecycleEvent) 
taskExecutorLifecycleEvent);
+                    reportTaskKilledEventToMaster(masterAddress,
+                            (TaskExecutorKilledLifecycleEvent) 
taskExecutorLifecycleEvent);
                     break;
                 case FAILED:
-                    
reportTaskFailedEventToMaster((TaskExecutorFailedLifecycleEvent) 
taskExecutorLifecycleEvent);
+                    reportTaskFailedEventToMaster(masterAddress,
+                            (TaskExecutorFailedLifecycleEvent) 
taskExecutorLifecycleEvent);
                     break;
                 case SUCCESS:
-                    
reportTaskSuccessEventToMaster((TaskExecutorSuccessLifecycleEvent) 
taskExecutorLifecycleEvent);
+                    reportTaskSuccessEventToMaster(masterAddress,
+                            (TaskExecutorSuccessLifecycleEvent) 
taskExecutorLifecycleEvent);
                     break;
                 default:
                     log.warn("Unsupported TaskExecutionEvent: {}", 
taskExecutorLifecycleEvent);
@@ -69,52 +76,59 @@ public class TaskExecutorEventRemoteReporterClient 
implements ITaskExecutorEvent
         }
     }
 
-    private static void reportTaskDispatchedEventToMaster(final 
TaskExecutorDispatchedLifecycleEvent taskExecutionDispatchedEvent) {
+    private static void reportTaskDispatchedEventToMaster(final String 
masterAddress,
+                                                          final 
TaskExecutorDispatchedLifecycleEvent taskExecutionDispatchedEvent) {
         Clients
                 .withService(ITaskExecutorEventListener.class)
-                
.withHost(taskExecutionDispatchedEvent.getWorkflowInstanceHost())
+                .withHost(masterAddress)
                 .onTaskExecutorDispatched(taskExecutionDispatchedEvent);
     }
 
-    private static void reportTaskRunningEventToMaster(final 
TaskExecutorStartedLifecycleEvent taskExecutionRunningEvent) {
+    private static void reportTaskRunningEventToMaster(final String 
masterAddress,
+                                                       final 
TaskExecutorStartedLifecycleEvent taskExecutionRunningEvent) {
         Clients
                 .withService(ITaskExecutorEventListener.class)
-                .withHost(taskExecutionRunningEvent.getWorkflowInstanceHost())
+                .withHost(masterAddress)
                 .onTaskExecutorRunning(taskExecutionRunningEvent);
     }
 
-    private static void reportTaskRuntimeContextChangeEventToMaster(final 
TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorLifecycleEvent) {
+    private static void reportTaskRuntimeContextChangeEventToMaster(final 
String masterAddress,
+                                                                    final 
TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorLifecycleEvent) {
         Clients
                 .withService(ITaskExecutorEventListener.class)
-                .withHost(taskExecutorLifecycleEvent.getWorkflowInstanceHost())
+                .withHost(masterAddress)
                 
.onTaskExecutorRuntimeContextChanged(taskExecutorLifecycleEvent);
     }
 
-    private static void reportTaskPausedEventToMaster(final 
TaskExecutorPausedLifecycleEvent taskExecutionPausedEvent) {
+    private static void reportTaskPausedEventToMaster(final String 
masterAddress,
+                                                      final 
TaskExecutorPausedLifecycleEvent taskExecutionPausedEvent) {
         Clients
                 .withService(ITaskExecutorEventListener.class)
-                .withHost(taskExecutionPausedEvent.getWorkflowInstanceHost())
+                .withHost(masterAddress)
                 .onTaskExecutorPaused(taskExecutionPausedEvent);
     }
 
-    private static void reportTaskKilledEventToMaster(final 
TaskExecutorKilledLifecycleEvent taskExecutionKilledEvent) {
+    private static void reportTaskKilledEventToMaster(final String 
masterAddress,
+                                                      final 
TaskExecutorKilledLifecycleEvent taskExecutionKilledEvent) {
         Clients
                 .withService(ITaskExecutorEventListener.class)
-                .withHost(taskExecutionKilledEvent.getWorkflowInstanceHost())
+                .withHost(masterAddress)
                 .onTaskExecutorKilled(taskExecutionKilledEvent);
     }
 
-    private static void reportTaskFailedEventToMaster(final 
TaskExecutorFailedLifecycleEvent taskExecutionFailedEvent) {
+    private static void reportTaskFailedEventToMaster(final String 
masterAddress,
+                                                      final 
TaskExecutorFailedLifecycleEvent taskExecutionFailedEvent) {
         Clients
                 .withService(ITaskExecutorEventListener.class)
-                .withHost(taskExecutionFailedEvent.getWorkflowInstanceHost())
+                .withHost(masterAddress)
                 .onTaskExecutorFailed(taskExecutionFailedEvent);
     }
 
-    private static void reportTaskSuccessEventToMaster(final 
TaskExecutorSuccessLifecycleEvent taskExecutionSuccessEvent) {
+    private static void reportTaskSuccessEventToMaster(final String 
masterAddress,
+                                                       final 
TaskExecutorSuccessLifecycleEvent taskExecutionSuccessEvent) {
         Clients
                 .withService(ITaskExecutorEventListener.class)
-                .withHost(taskExecutionSuccessEvent.getWorkflowInstanceHost())
+                .withHost(masterAddress)
                 .onTaskExecutorSuccess(taskExecutionSuccessEvent);
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java
index 41a80ba716..fd2a32313a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java
@@ -25,7 +25,9 @@ import org.springframework.stereotype.Component;
 public class LogicTaskExecutorLifecycleEventReporter extends 
TaskExecutorLifecycleEventRemoteReporter {
 
     public LogicTaskExecutorLifecycleEventReporter(
-                                                   final 
LogicTaskExecutorEventRemoteReporterClient 
logicTaskExecutorEventRemoteReporterClient) {
-        super("LogicTaskExecutorLifecycleEventReporter", 
logicTaskExecutorEventRemoteReporterClient);
+                                                   final 
LogicTaskExecutorEventRemoteReporterClient 
logicTaskExecutorEventRemoteReporterClient,
+                                                   final 
LogicTaskExecutorRepository taskExecutorRepository) {
+        super("LogicTaskExecutorLifecycleEventReporter", 
logicTaskExecutorEventRemoteReporterClient,
+                taskExecutorRepository);
     }
 }
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java
index b7888df626..e7bfb866d8 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java
@@ -21,5 +21,6 @@ import 
org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorL
 
 public interface ITaskExecutorEventRemoteReporterClient {
 
-    void reportTaskExecutionEventToMaster(final 
IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent);
+    void reportTaskExecutionEventToMaster(final String masterAddress,
+                                          final 
IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent);
 }
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java
index 0eed5359c3..94e5737ca9 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java
@@ -48,10 +48,9 @@ public interface ITaskExecutorLifecycleEventReporter extends 
AutoCloseable {
     void receiveTaskExecutorLifecycleEventACK(final 
TaskExecutorLifecycleEventAck taskExecutorLifecycleEventAck);
 
     /**
-     * Reassign the workflow instance host of the 
IReportableTaskExecutorLifecycleEvent.
-     * <p> This method is used to reassign the workflow instance host of the 
IReportableTaskExecutorLifecycleEvent, once the workflow's host changed.
+     * Reset the events in the channel to allow them to be reported 
immediately.
      */
-    boolean reassignWorkflowInstanceHost(int taskInstanceId, String 
workflowHost);
+    void onWorkflowInstanceHostChanged(int taskInstanceId);
 
     /**
      * Shutdown the reporter.
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java
index b5187c8399..d83de7f98d 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java
@@ -17,13 +17,18 @@
 
 package org.apache.dolphinscheduler.task.executor.eventbus;
 
+import org.apache.dolphinscheduler.common.exception.BaseException;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
+import org.apache.dolphinscheduler.task.executor.ITaskExecutorRepository;
 import 
org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorLifecycleEvent;
+import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorFinalizeLifecycleEvent;
 import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorLifecycleEventType;
 import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
 
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -56,11 +61,15 @@ public class TaskExecutorLifecycleEventRemoteReporter 
extends BaseDaemonThread
 
     private final Condition taskExecutionEventEmptyCondition = 
eventChannelsLock.newCondition();
 
+    private final ITaskExecutorRepository taskExecutorRepository;
+
     public TaskExecutorLifecycleEventRemoteReporter(final String reporterName,
-                                                    final 
ITaskExecutorEventRemoteReporterClient taskExecutorEventRemoteReporterClient) {
+                                                    final 
ITaskExecutorEventRemoteReporterClient taskExecutorEventRemoteReporterClient,
+                                                    final 
ITaskExecutorRepository taskExecutorRepository) {
         super(reporterName);
         this.reporterName = reporterName;
         this.taskExecutorEventRemoteReporterClient = 
taskExecutorEventRemoteReporterClient;
+        this.taskExecutorRepository = taskExecutorRepository;
     }
 
     @Override
@@ -128,6 +137,11 @@ public class TaskExecutorLifecycleEventRemoteReporter 
extends BaseDaemonThread
                 log.info("Failed removed ReportableTaskExecutorLifecycleEvent 
by ack: {}", eventAck);
             }
             if (eventChannel.isEmpty()) {
+                // Extend the lifecycle of the TaskExecutor to span the entire 
processing cycle of the task.
+                // so we can finalize the TaskExecutor after the associated 
channel has been removed.
+                if (removed != null && removed.getType().isFinished()) {
+                    finalizeTaskExecutor(removed.getTaskInstanceId());
+                }
                 eventChannels.remove(taskExecutorId);
                 log.debug("Removed 
ReportableTaskExecutorLifecycleEventChannel: {}", taskExecutorId);
             }
@@ -138,15 +152,14 @@ public class TaskExecutorLifecycleEventRemoteReporter 
extends BaseDaemonThread
     }
 
     @Override
-    public boolean reassignWorkflowInstanceHost(int taskInstanceId, String 
workflowHost) {
+    public void onWorkflowInstanceHostChanged(int taskInstanceId) {
         eventChannelsLock.lock();
         try {
             final ReportableTaskExecutorLifecycleEventChannel eventChannel = 
eventChannels.get(taskInstanceId);
-            if (eventChannel == null) {
-                return false;
+            if (eventChannel != null) {
+                eventChannel.taskExecutionEventsQueue.forEach(event -> 
event.setLatestReportTime(null));
+                taskExecutionEventEmptyCondition.signalAll();
             }
-            eventChannel.taskExecutionEventsQueue.forEach(event -> 
event.setWorkflowInstanceHost(workflowHost));
-            return true;
         } finally {
             eventChannelsLock.unlock();
         }
@@ -164,6 +177,16 @@ public class TaskExecutorLifecycleEventRemoteReporter 
extends BaseDaemonThread
         return eventChannels;
     }
 
+    private void finalizeTaskExecutor(final Integer taskExecutorId) {
+        final Optional<ITaskExecutor> taskExecutorOptional = 
taskExecutorRepository.get(taskExecutorId);
+        if (taskExecutorOptional.isPresent()) {
+            taskExecutorOptional.get().getTaskExecutorEventBus()
+                    
.publish(TaskExecutorFinalizeLifecycleEvent.of(taskExecutorOptional.get()));
+        } else {
+            log.warn("TaskExecutor is not exists: {}", taskExecutorId);
+        }
+    }
+
     private void handleTaskExecutionEventChannel(final 
ReportableTaskExecutorLifecycleEventChannel 
reportableTaskExecutorLifecycleEventChannel) {
         if (reportableTaskExecutorLifecycleEventChannel.isEmpty()) {
             return;
@@ -175,7 +198,16 @@ public class TaskExecutorLifecycleEventRemoteReporter 
extends BaseDaemonThread
                             
TaskExecutorMDCUtils.logWithMDC(headEvent.getTaskInstanceId())) {
                 try {
                     if (isTaskExecutorEventNeverSent(headEvent) || 
isRetryIntervalExceeded(headEvent)) {
-                        
taskExecutorEventRemoteReporterClient.reportTaskExecutionEventToMaster(headEvent);
+                        final Optional<ITaskExecutor> taskExecutorOptional =
+                                
taskExecutorRepository.get(headEvent.getTaskInstanceId());
+                        if (!taskExecutorOptional.isPresent()) {
+                            throw new BaseException(String.format("The 
TaskExecutor id %d is not exist.",
+                                    headEvent.getTaskInstanceId()));
+                        }
+                        final String masterAddress =
+                                
taskExecutorOptional.get().getTaskExecutionContext().getWorkflowInstanceHost();
+                        
taskExecutorEventRemoteReporterClient.reportTaskExecutionEventToMaster(masterAddress,
+                                headEvent);
                         continue;
                     }
                     if (log.isDebugEnabled()) {
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java
index 795945c7f2..567927cc92 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java
@@ -29,16 +29,6 @@ public interface IReportableTaskExecutorLifecycleEvent 
extends ITaskExecutorLife
      */
     int getWorkflowInstanceId();
 
-    /**
-     * The host of the workflow instance which the event should report to.
-     */
-    String getWorkflowInstanceHost();
-
-    /**
-     * Set the host of the workflow instance which the event should report to.
-     */
-    void setWorkflowInstanceHost(String workflowInstanceHost);
-
     /**
      * Get the latest report time of the event, if the event is never 
reported, return null.
      */
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java
index e7daf61e15..9424d3bb27 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorDispatchedLifecycleEvent extends 
AbstractTaskExecutorLi
 
     private int workflowInstanceId;
 
-    private String workflowInstanceHost;
-
     private String taskInstanceHost;
 
     private Long latestReportTime;
@@ -49,7 +47,6 @@ public class TaskExecutorDispatchedLifecycleEvent extends 
AbstractTaskExecutorLi
                 .taskInstanceId(taskExecutionContext.getTaskInstanceId())
                 
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
                 .taskInstanceHost(taskExecutionContext.getHost())
-                
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
                 .type(TaskExecutorLifecycleEventType.DISPATCHED)
                 .build();
     }
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java
index a77795de3a..493853b223 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorFailedLifecycleEvent extends 
AbstractTaskExecutorLifecy
 
     private int workflowInstanceId;
 
-    private String workflowInstanceHost;
-
     private String taskInstanceHost;
 
     private String appIds;
@@ -53,7 +51,6 @@ public class TaskExecutorFailedLifecycleEvent extends 
AbstractTaskExecutorLifecy
                 .taskInstanceId(taskExecutor.getId())
                 
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
                 .taskInstanceHost(taskExecutionContext.getHost())
-                
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
                 .appIds(taskExecutionContext.getAppIds())
                 .endTime(taskExecutionContext.getEndTime())
                 .type(TaskExecutorLifecycleEventType.FAILED)
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java
index d587aca68b..606effc03e 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorKilledLifecycleEvent extends 
AbstractTaskExecutorLifecy
 
     private int workflowInstanceId;
 
-    private String workflowInstanceHost;
-
     private String taskInstanceHost;
 
     private long endTime;
@@ -51,7 +49,6 @@ public class TaskExecutorKilledLifecycleEvent extends 
AbstractTaskExecutorLifecy
                 .taskInstanceId(taskExecutionContext.getTaskInstanceId())
                 
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
                 .taskInstanceHost(taskExecutionContext.getHost())
-                
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
                 .endTime(taskExecutionContext.getEndTime())
                 .type(TaskExecutorLifecycleEventType.KILLED)
                 .build();
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java
index 88fa77835a..aa0aa2cd4e 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java
@@ -40,4 +40,10 @@ public enum TaskExecutorLifecycleEventType {
     FINALIZE,
     ;
 
+    public boolean isFinished() {
+        return (this == KILLED
+                || this == PAUSED
+                || this == FAILED
+                || this == SUCCESS);
+    }
 }
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorPausedLifecycleEvent.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorPausedLifecycleEvent.java
index 0057fb95d8..c81699148e 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorPausedLifecycleEvent.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorPausedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorPausedLifecycleEvent extends 
AbstractTaskExecutorLifecy
 
     private int workflowInstanceId;
 
-    private String workflowInstanceHost;
-
     private String taskInstanceHost;
 
     private long endTime;
@@ -51,7 +49,6 @@ public class TaskExecutorPausedLifecycleEvent extends 
AbstractTaskExecutorLifecy
                 .taskInstanceId(taskExecutionContext.getTaskInstanceId())
                 
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
                 .taskInstanceHost(taskExecutionContext.getHost())
-                
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
                 .endTime(taskExecutionContext.getEndTime())
                 .type(TaskExecutorLifecycleEventType.PAUSED)
                 .build();
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorRuntimeContextChangedLifecycleEvent.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorRuntimeContextChangedLifecycleEvent.java
index 1febc2aea6..f759e0da9e 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorRuntimeContextChangedLifecycleEvent.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorRuntimeContextChangedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorRuntimeContextChangedLifecycleEvent 
extends AbstractTas
 
     private int workflowInstanceId;
 
-    private String workflowInstanceHost;
-
     private String taskInstanceHost;
 
     @Deprecated
@@ -53,7 +51,6 @@ public class TaskExecutorRuntimeContextChangedLifecycleEvent 
extends AbstractTas
         return TaskExecutorRuntimeContextChangedLifecycleEvent.builder()
                 .taskInstanceId(taskExecutionContext.getTaskInstanceId())
                 
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
-                
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
                 .taskInstanceHost(taskExecutionContext.getHost())
                 .processId(taskExecutionContext.getProcessId())
                 .appIds(taskExecutionContext.getAppIds())
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java
index 6045f93e90..94126bb39d 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java
@@ -37,8 +37,6 @@ public class TaskExecutorStartedLifecycleEvent extends 
AbstractTaskExecutorLifec
 
     private int workflowInstanceId;
 
-    private String workflowInstanceHost;
-
     private String taskInstanceHost;
 
     private long startTime;
@@ -57,7 +55,6 @@ public class TaskExecutorStartedLifecycleEvent extends 
AbstractTaskExecutorLifec
                 
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
                 .taskInstanceHost(taskExecutionContext.getHost())
                 
.startTime(taskExecutor.getTaskExecutionContext().getStartTime())
-                
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
                 .logPath(taskExecutionContext.getLogPath())
                 .executePath(taskExecutionContext.getExecutePath())
                 .type(TaskExecutorLifecycleEventType.RUNNING)
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorSuccessLifecycleEvent.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorSuccessLifecycleEvent.java
index 92a9e66e77..947d1edf36 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorSuccessLifecycleEvent.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorSuccessLifecycleEvent.java
@@ -40,8 +40,6 @@ public class TaskExecutorSuccessLifecycleEvent extends 
AbstractTaskExecutorLifec
 
     private int workflowInstanceId;
 
-    private String workflowInstanceHost;
-
     private String taskInstanceHost;
 
     private long endTime;
@@ -54,7 +52,6 @@ public class TaskExecutorSuccessLifecycleEvent extends 
AbstractTaskExecutorLifec
         final TaskExecutionContext taskExecutionContext = 
taskExecutor.getTaskExecutionContext();
         return TaskExecutorSuccessLifecycleEvent.builder()
                 
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
-                
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
                 .taskInstanceId(taskExecutionContext.getTaskInstanceId())
                 .taskInstanceHost(taskExecutionContext.getHost())
                 .varPool(taskExecutionContext.getVarPool())
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java
index bc5b77a266..c17afaca63 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java
@@ -79,7 +79,7 @@ public class TaskExecutorLifecycleEventListener implements 
ITaskExecutorLifecycl
 
     @Override
     public void onTaskExecutorPausedLifecycleEvent(final 
TaskExecutorPausedLifecycleEvent event) {
-        
taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(event);
+        reportTaskExecutorLifecycleEventToMaster(event);
     }
 
     @Override
@@ -90,17 +90,17 @@ public class TaskExecutorLifecycleEventListener implements 
ITaskExecutorLifecycl
 
     @Override
     public void onTaskExecutorKilledLifecycleEvent(final 
TaskExecutorKilledLifecycleEvent event) {
-        
taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(event);
+        reportTaskExecutorLifecycleEventToMaster(event);
     }
 
     @Override
     public void onTaskExecutorSuccessLifecycleEvent(final 
TaskExecutorSuccessLifecycleEvent event) {
-        
taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(event);
+        reportTaskExecutorLifecycleEventToMaster(event);
     }
 
     @Override
     public void 
onTaskExecutorFailLifecycleEvent(TaskExecutorFailedLifecycleEvent event) {
-        
taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(event);
+        reportTaskExecutorLifecycleEventToMaster(event);
     }
 
     @Override
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/worker/AbstractTaskExecutorWorker.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/worker/AbstractTaskExecutorWorker.java
index eed62d8a9e..f9dff1caff 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/worker/AbstractTaskExecutorWorker.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/worker/AbstractTaskExecutorWorker.java
@@ -22,7 +22,6 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
 import org.apache.dolphinscheduler.task.executor.TaskExecutorState;
 import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorFailedLifecycleEvent;
-import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorFinalizeLifecycleEvent;
 import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent;
 import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent;
 import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent;
@@ -85,7 +84,6 @@ public abstract class AbstractTaskExecutorWorker implements 
ITaskExecutorWorker
 
     protected void onTaskExecutorFinished(final ITaskExecutor taskExecutor) {
         unFireTaskExecutor(taskExecutor);
-        
taskExecutor.getTaskExecutorEventBus().publish(TaskExecutorFinalizeLifecycleEvent.of(taskExecutor));
     }
 
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java
index be70d0893d..67aba113e6 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskEngineDelegator.java
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.task.executor.TaskEngine;
 import 
org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
 import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorReassignMasterRequest;
 
+import java.util.Optional;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.stereotype.Component;
@@ -75,10 +77,13 @@ public class PhysicalTaskEngineDelegator implements 
AutoCloseable {
     public boolean reassignWorkflowInstanceHost(final 
TaskExecutorReassignMasterRequest taskExecutorReassignMasterRequest) {
         final int taskInstanceId = 
taskExecutorReassignMasterRequest.getTaskInstanceId();
         final String workflowHost = 
taskExecutorReassignMasterRequest.getWorkflowHost();
-        // todo: Is this reassign can make sure there is no concurrent problem?
-        physicalTaskExecutorRepository.get(taskInstanceId).ifPresent(
-                taskExecutor -> 
taskExecutor.getTaskExecutionContext().setWorkflowInstanceHost(workflowHost));
-        return 
physicalTaskExecutorEventReporter.reassignWorkflowInstanceHost(taskInstanceId, 
workflowHost);
+        final Optional<ITaskExecutor> taskExecutorOptional = 
physicalTaskExecutorRepository.get(taskInstanceId);
+        if (taskExecutorOptional.isPresent()) {
+            
taskExecutorOptional.get().getTaskExecutionContext().setWorkflowInstanceHost(workflowHost);
+            
physicalTaskExecutorEventReporter.onWorkflowInstanceHostChanged(taskInstanceId);
+            return true;
+        }
+        return false;
     }
 
     @Override
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorLifecycleEventReporter.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorLifecycleEventReporter.java
index 3be993adbe..fdb204b5d3 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorLifecycleEventReporter.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorLifecycleEventReporter.java
@@ -25,7 +25,9 @@ import org.springframework.stereotype.Component;
 public class PhysicalTaskExecutorLifecycleEventReporter extends 
TaskExecutorLifecycleEventRemoteReporter {
 
     public PhysicalTaskExecutorLifecycleEventReporter(
-                                                      final 
PhysicalTaskExecutorEventRemoteReporterClient 
physicalTaskExecutorEventRemoteReporterClient) {
-        super("PhysicalTaskExecutorLifecycleEventReporter", 
physicalTaskExecutorEventRemoteReporterClient);
+                                                      final 
PhysicalTaskExecutorEventRemoteReporterClient 
physicalTaskExecutorEventRemoteReporterClient,
+                                                      final 
PhysicalTaskExecutorRepository taskExecutorRepository) {
+        super("PhysicalTaskExecutorLifecycleEventReporter", 
physicalTaskExecutorEventRemoteReporterClient,
+                taskExecutorRepository);
     }
 }


Reply via email to