caishunfeng commented on code in PR #16790:
URL: 
https://github.com/apache/dolphinscheduler/pull/16790#discussion_r1863011852


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicAsyncTaskExecuteFunction.java:
##########
@@ -143,7 +142,7 @@ private void setOutputParameters() {
 
         List<Property> taskPropertyList = new 
ArrayList<>(JSONUtils.toList(taskInstance.getVarPool(), Property.class));
         taskPropertyList.add(property);
-        
logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList));
+        // 
logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList));

Review Comment:
   Why comment out this code?



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java:
##########
@@ -38,6 +38,10 @@ public enum TaskLifecycleEventType implements 
ILifecycleEventType {
      * The task instance is running at the target executor server.
      */
     RUNNING,
+    /**
+     * The task instance's runtime context changed.
+     */
+    RUNTIME_CONTEXT_CHANGED,

Review Comment:
   Why need to add this event type? The runtime context should be check in 
running event or finish event.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java:
##########
@@ -21,26 +21,108 @@
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.extract.base.client.Clients;
-import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
-import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
-import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
-import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseRequest;
-import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseResponse;
+import org.apache.dolphinscheduler.extract.base.utils.Host;
+import 
org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.IWorkerLoadBalancer;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+import 
org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
+import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
+import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
+import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest;
+import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;
+import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorPauseRequest;
+import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorPauseResponse;
+import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorReassignMasterRequest;
+import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorReassignMasterResponse;
 
 import org.apache.commons.lang3.StringUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
 public class PhysicalTaskExecutorClientDelegator implements 
ITaskExecutorClientDelegator {
 
+    @Autowired
+    private MasterConfig masterConfig;
+
+    @Autowired
+    private IWorkerLoadBalancer workerLoadBalancer;
+
     @Override
-    public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) {
+    public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) 
throws TaskDispatchException {
+        final TaskExecutionContext taskExecutionContext = 
taskExecutionRunnable.getTaskExecutionContext();
+        final String taskName = taskExecutionContext.getTaskName();
+        final String physicalTaskExecutorAddress = workerLoadBalancer
+                .select(taskExecutionContext.getWorkerGroup())
+                .map(Host::of)
+                .map(Host::getAddress)
+                .orElseThrow(() -> new TaskDispatchException(
+                        String.format("Cannot find the host to dispatch 
Task[id=%s, name=%s]",
+                                taskExecutionContext.getTaskInstanceId(), 
taskName)));
+
+        taskExecutionContext.setHost(physicalTaskExecutorAddress);
+        
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);
+
+        try {
+            final TaskExecutorDispatchResponse taskExecutorDispatchResponse = 
Clients
+                    .withService(IPhysicalTaskExecutorOperator.class)
+                    .withHost(physicalTaskExecutorAddress)
+                    
.dispatchTask(TaskExecutorDispatchRequest.of(taskExecutionRunnable.getTaskExecutionContext()));
+            if (!taskExecutorDispatchResponse.isDispatchSuccess()) {
+                throw new TaskDispatchException(
+                        "Dispatch task: " + taskName + " to " + 
physicalTaskExecutorAddress + " failed: "
+                                + taskExecutorDispatchResponse);
+            }
+        } catch (TaskDispatchException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new TaskDispatchException(
+                    "Dispatch task: " + taskName + " to " + 
physicalTaskExecutorAddress + " failed", e);
+        }
+    }
+
+    @Override
+    public boolean reassignMasterHost(final ITaskExecutionRunnable 
taskExecutionRunnable) {
+        final String taskName = taskExecutionRunnable.getName();
+        checkArgument(taskExecutionRunnable.isTaskInstanceInitialized(),
+                "Task " + taskName + "is not initialized cannot take-over");
+
+        final TaskInstance taskInstance = 
taskExecutionRunnable.getTaskInstance();
+        final String taskExecutorHost = taskInstance.getHost();
+        if (StringUtils.isEmpty(taskExecutorHost)) {
+            log.debug(
+                    "The task executor: {} host is empty, cannot take-over, 
this might caused by the task hasn't dispatched",
+                    taskName);
+            return false;
+        }
 
+        final TaskExecutorReassignMasterRequest 
taskExecutorReassignMasterRequest =
+                TaskExecutorReassignMasterRequest.builder()
+                        .taskInstanceId(taskInstance.getId())
+                        .workflowHost(masterConfig.getMasterAddress())
+                        .build();
+        final TaskExecutorReassignMasterResponse 
taskExecutorReassignMasterResponse =
+                Clients
+                        .withService(IPhysicalTaskExecutorOperator.class)
+                        .withHost(taskInstance.getHost())
+                        
.reassignWorkflowInstanceHost(taskExecutorReassignMasterRequest);
+        boolean success = taskExecutorReassignMasterResponse.isSuccess();
+        if (success) {
+            log.info("Reassign master host {} to {} successfully", 
taskExecutorHost, taskName);
+        } else {
+            log.info("Reassign master host {} on {} failed with response {}",

Review Comment:
   ```suggestion
               log.error("Reassign master host {} on {} failed with response 
{}",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to