ruanwenjun commented on code in PR #16790:
URL:
https://github.com/apache/dolphinscheduler/pull/16790#discussion_r1863040844
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java:
##########
@@ -21,26 +21,108 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.extract.base.client.Clients;
-import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseRequest;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseResponse;
+import org.apache.dolphinscheduler.extract.base.utils.Host;
+import
org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.IWorkerLoadBalancer;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+import
org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
+import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
+import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
+import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest;
+import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;
+import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorPauseRequest;
+import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorPauseResponse;
+import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorReassignMasterRequest;
+import
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorReassignMasterResponse;
import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class PhysicalTaskExecutorClientDelegator implements
ITaskExecutorClientDelegator {
+ @Autowired
+ private MasterConfig masterConfig;
+
+ @Autowired
+ private IWorkerLoadBalancer workerLoadBalancer;
+
@Override
- public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) {
+ public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable)
throws TaskDispatchException {
+ final TaskExecutionContext taskExecutionContext =
taskExecutionRunnable.getTaskExecutionContext();
+ final String taskName = taskExecutionContext.getTaskName();
+ final String physicalTaskExecutorAddress = workerLoadBalancer
+ .select(taskExecutionContext.getWorkerGroup())
+ .map(Host::of)
+ .map(Host::getAddress)
+ .orElseThrow(() -> new TaskDispatchException(
+ String.format("Cannot find the host to dispatch
Task[id=%s, name=%s]",
+ taskExecutionContext.getTaskInstanceId(),
taskName)));
+
+ taskExecutionContext.setHost(physicalTaskExecutorAddress);
+
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);
+
+ try {
+ final TaskExecutorDispatchResponse taskExecutorDispatchResponse =
Clients
+ .withService(IPhysicalTaskExecutorOperator.class)
+ .withHost(physicalTaskExecutorAddress)
+
.dispatchTask(TaskExecutorDispatchRequest.of(taskExecutionRunnable.getTaskExecutionContext()));
+ if (!taskExecutorDispatchResponse.isDispatchSuccess()) {
+ throw new TaskDispatchException(
+ "Dispatch task: " + taskName + " to " +
physicalTaskExecutorAddress + " failed: "
+ + taskExecutorDispatchResponse);
+ }
+ } catch (TaskDispatchException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new TaskDispatchException(
+ "Dispatch task: " + taskName + " to " +
physicalTaskExecutorAddress + " failed", e);
+ }
+ }
+
+ @Override
+ public boolean reassignMasterHost(final ITaskExecutionRunnable
taskExecutionRunnable) {
+ final String taskName = taskExecutionRunnable.getName();
+ checkArgument(taskExecutionRunnable.isTaskInstanceInitialized(),
+ "Task " + taskName + "is not initialized cannot take-over");
+
+ final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
+ final String taskExecutorHost = taskInstance.getHost();
+ if (StringUtils.isEmpty(taskExecutorHost)) {
+ log.debug(
+ "The task executor: {} host is empty, cannot take-over,
this might caused by the task hasn't dispatched",
+ taskName);
+ return false;
+ }
+ final TaskExecutorReassignMasterRequest
taskExecutorReassignMasterRequest =
+ TaskExecutorReassignMasterRequest.builder()
+ .taskInstanceId(taskInstance.getId())
+ .workflowHost(masterConfig.getMasterAddress())
+ .build();
+ final TaskExecutorReassignMasterResponse
taskExecutorReassignMasterResponse =
+ Clients
+ .withService(IPhysicalTaskExecutorOperator.class)
+ .withHost(taskInstance.getHost())
+
.reassignWorkflowInstanceHost(taskExecutorReassignMasterRequest);
+ boolean success = taskExecutorReassignMasterResponse.isSuccess();
+ if (success) {
+ log.info("Reassign master host {} to {} successfully",
taskExecutorHost, taskName);
+ } else {
+ log.info("Reassign master host {} on {} failed with response {}",
Review Comment:
This is expected when worker failover, so the log level is info, we don't
need to change to error, will cause alert if we have set alert for error log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]