This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ce34e21960 Task instance failure when worker group doesn't exist 
(#13448)
ce34e21960 is described below

commit ce34e21960fa8b80769701dbd999382b0e9b9e27
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Jan 29 16:27:56 2023 +0800

    Task instance failure when worker group doesn't exist (#13448)
---
 .../master/consumer/TaskPriorityQueueConsumer.java | 126 ++++++++++++---------
 .../server/master/dispatch/ExecutorDispatcher.java |   6 +-
 .../master/dispatch/context/ExecutionContext.java  |  20 ++--
 .../WorkerGroupNotFoundException.java}             |  20 +---
 .../master/dispatch/executor/ExecutorManager.java  |   3 +-
 .../dispatch/executor/NettyExecutorManager.java    |  14 +--
 .../master/dispatch/host/CommonHostManager.java    |   6 +-
 .../server/master/dispatch/host/HostManager.java   |   7 +-
 .../dispatch/host/LowerWeightHostManager.java      |  32 +++---
 .../master/event/TaskResultEventHandler.java       |   8 +-
 .../server/master/processor/queue/TaskEvent.java   |   9 +-
 .../processor/queue/TaskExecuteRunnable.java       |   6 +-
 .../server/master/registry/ServerNodeManager.java  |   6 +-
 .../master/runner/StreamTaskExecuteRunnable.java   |   3 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    |   8 +-
 .../executor/NettyExecutorManagerTest.java         |   5 +-
 .../dispatch/host/RoundRobinHostManagerTest.java   |   5 +-
 17 files changed, 162 insertions(+), 122 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index db2b66a2b8..8af2f1e30a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -19,14 +19,17 @@ package org.apache.dolphinscheduler.server.master.consumer;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
 import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -35,6 +38,7 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -47,6 +51,7 @@ import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -171,8 +176,17 @@ public class TaskPriorityQueueConsumer extends 
BaseDaemonThread {
 
             consumerThreadPoolExecutor.submit(() -> {
                 try {
-                    boolean dispatchResult = this.dispatchTask(taskPriority);
-                    if (!dispatchResult) {
+                    try {
+                        this.dispatchTask(taskPriority);
+                    } catch (WorkerGroupNotFoundException e) {
+                        // If the worker group not found, will not try to 
dispatch again.
+                        // The task instance will be failed
+                        // todo:
+                        addDispatchFailedEvent(taskPriority);
+                    } catch (ExecuteException e) {
+                        failedDispatchTasks.add(taskPriority);
+                    } catch (Exception e) {
+                        logger.error("Dispatch task error, meet an unknown 
exception", e);
                         failedDispatchTasks.add(taskPriority);
                     }
                 } finally {
@@ -193,60 +207,50 @@ public class TaskPriorityQueueConsumer extends 
BaseDaemonThread {
      * @param taskPriority taskPriority
      * @return dispatch result, return true if dispatch success, return false 
if dispatch failed.
      */
-    protected boolean dispatchTask(TaskPriority taskPriority) {
+    protected void dispatchTask(TaskPriority taskPriority) throws 
ExecuteException {
         TaskMetrics.incTaskDispatch();
-        boolean result = false;
-        try {
-            WorkflowExecuteRunnable workflowExecuteRunnable =
-                    
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
-            if (workflowExecuteRunnable == null) {
-                logger.error("Cannot find the related processInstance of the 
task, taskPriority: {}", taskPriority);
-                return true;
-            }
-            Optional<TaskInstance> taskInstanceOptional =
-                    
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
-            if (!taskInstanceOptional.isPresent()) {
-                logger.error("Cannot find the task instance from related 
processInstance, taskPriority: {}",
-                        taskPriority);
-                // we return true, so that we will drop this task.
-                return true;
-            }
-            TaskInstance taskInstance = taskInstanceOptional.get();
-            TaskExecutionContext context = 
taskPriority.getTaskExecutionContext();
-            ExecutionContext executionContext =
-                    new ExecutionContext(toCommand(context), 
ExecutorType.WORKER, context.getWorkerGroup(),
-                            taskInstance);
-
-            if (isTaskNeedToCheck(taskPriority)) {
-                if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
-                    // when task finish, ignore this task, there is no need to 
dispatch anymore
-                    logger.info("Task {} is already finished, no need to 
dispatch, task instance id: {}",
-                            taskInstance.getName(), taskInstance.getId());
-                    return true;
-                }
-            }
-
-            // check task is cache execution, and decide whether to dispatch
-            if (checkIsCacheExecution(taskInstance, context)) {
-                return true;
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+                
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
+        if (workflowExecuteRunnable == null) {
+            logger.error("Cannot find the related processInstance of the task, 
taskPriority: {}", taskPriority);
+            return;
+        }
+        Optional<TaskInstance> taskInstanceOptional =
+                
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
+        if (!taskInstanceOptional.isPresent()) {
+            logger.error("Cannot find the task instance from related 
processInstance, taskPriority: {}",
+                    taskPriority);
+            // we return true, so that we will drop this task.
+            return;
+        }
+        TaskInstance taskInstance = taskInstanceOptional.get();
+        TaskExecutionContext context = taskPriority.getTaskExecutionContext();
+        ExecutionContext executionContext = ExecutionContext.builder()
+                .taskInstance(taskInstance)
+                .workerGroup(context.getWorkerGroup())
+                .executorType(ExecutorType.WORKER)
+                .command(toCommand(context))
+                .build();
+
+        if (isTaskNeedToCheck(taskPriority)) {
+            if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
+                // when task finish, ignore this task, there is no need to 
dispatch anymore
+                logger.info("Task {} is already finished, no need to dispatch, 
task instance id: {}",
+                        taskInstance.getName(), taskInstance.getId());
+                return;
             }
+        }
 
-            result = dispatcher.dispatch(executionContext);
-
-            if (result) {
-                logger.info("Master success dispatch task to worker, 
taskInstanceId: {}, worker: {}",
-                        taskPriority.getTaskId(),
-                        executionContext.getHost());
-                addDispatchEvent(context, executionContext);
-            } else {
-                logger.info("Master failed to dispatch task to worker, 
taskInstanceId: {}, worker: {}",
-                        taskPriority.getTaskId(),
-                        executionContext.getHost());
-            }
-        } catch (RuntimeException | ExecuteException e) {
-            logger.error("Master dispatch task to worker error, taskPriority: 
{}", taskPriority, e);
+        // check task is cache execution, and decide whether to dispatch
+        if (checkIsCacheExecution(taskInstance, context)) {
+            return;
         }
-        return result;
+
+        dispatcher.dispatch(executionContext);
+        logger.info("Master success dispatch task to worker, taskInstanceId: 
{}, worker: {}",
+                taskPriority.getTaskId(),
+                executionContext.getHost());
+        addDispatchEvent(context, executionContext);
     }
 
     /**
@@ -258,6 +262,24 @@ public class TaskPriorityQueueConsumer extends 
BaseDaemonThread {
         taskEventService.addEvent(taskEvent);
     }
 
+    private void addDispatchFailedEvent(TaskPriority taskPriority) {
+        TaskExecutionContext taskExecutionContext = 
taskPriority.getTaskExecutionContext();
+        TaskEvent taskEvent = TaskEvent.builder()
+                .processInstanceId(taskPriority.getProcessInstanceId())
+                .taskInstanceId(taskPriority.getTaskId())
+                .state(TaskExecutionStatus.FAILURE)
+                .logPath(taskExecutionContext.getLogPath())
+                .executePath(taskExecutionContext.getExecutePath())
+                .appIds(taskExecutionContext.getAppIds())
+                .processId(taskExecutionContext.getProcessId())
+                .varPool(taskExecutionContext.getVarPool())
+                
.startTime(DateUtils.timeStampToDate(taskExecutionContext.getStartTime()))
+                .endTime(new Date())
+                .event(TaskEventType.RESULT)
+                .build();
+        taskEventService.addEvent(taskEvent);
+    }
+
     private Command toCommand(TaskExecutionContext taskExecutionContext) {
         // todo: we didn't set the host here, since right now we didn't need 
to retry this message.
         TaskDispatchCommand requestCommand = new 
TaskDispatchCommand(taskExecutionContext,
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 3b1a0d4839..90cb07fbb2 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -74,7 +74,7 @@ public class ExecutorDispatcher implements InitializingBean {
      * @return result
      * @throws ExecuteException if error throws ExecuteException
      */
-    public Boolean dispatch(final ExecutionContext context) throws 
ExecuteException {
+    public void dispatch(final ExecutionContext context) throws 
ExecuteException {
         // get executor manager
         ExecutorManager<Boolean> executorManager = 
this.executorManagers.get(context.getExecutorType());
         if (executorManager == null) {
@@ -86,13 +86,13 @@ public class ExecutorDispatcher implements InitializingBean 
{
         if (StringUtils.isEmpty(host.getAddress())) {
             logger.warn("fail to execute : {} due to no suitable worker, 
current task needs worker group {} to execute",
                     context.getCommand(), context.getWorkerGroup());
-            return false;
+            throw new ExecuteException("no suitable worker");
         }
         context.setHost(host);
         executorManager.beforeExecute(context);
         try {
             // task execute
-            return executorManager.execute(context);
+            executorManager.execute(context);
         } finally {
             executorManager.afterExecute(context);
         }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 567a961877..3294f04a01 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -24,12 +24,15 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
-/**
- *  execution context
- */
 @Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class ExecutionContext {
 
     /**
@@ -40,19 +43,16 @@ public class ExecutionContext {
     /**
      * command
      */
-    private final Command command;
+    private Command command;
 
-    private final TaskInstance taskInstance;
+    private TaskInstance taskInstance;
 
-    /**
-     * executor type : worker or client
-     */
-    private final ExecutorType executorType;
+    private ExecutorType executorType;
 
     /**
      * worker group
      */
-    private final String workerGroup;
+    private String workerGroup;
 
     public ExecutionContext(Command command, ExecutorType executorType, 
TaskInstance taskInstance) {
         this(command, executorType, DEFAULT_WORKER_GROUP, taskInstance);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/WorkerGroupNotFoundException.java
similarity index 67%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/WorkerGroupNotFoundException.java
index fccc31b648..81755152c3 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/WorkerGroupNotFoundException.java
@@ -15,21 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.dispatch.host;
+package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
-import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-
-/**
- *  host manager
- */
-public interface HostManager {
-
-    /**
-     *  select host
-     * @param context context
-     * @return host
-     */
-    Host select(ExecutionContext context);
+public class WorkerGroupNotFoundException extends ExecuteException {
 
+    public WorkerGroupNotFoundException(String workerGroup) {
+        super("Cannot find worker group: " + workerGroup);
+    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 1e7754082c..9f1e02f151 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -35,11 +35,12 @@ public interface ExecutorManager<T> {
 
     /**
      * execute task
+     *
      * @param context context
      * @return T
      * @throws ExecuteException if error throws ExecuteException
      */
-    T execute(ExecutionContext context) throws ExecuteException;
+    void execute(ExecutionContext context) throws ExecuteException;
 
     /**
      * execute task directly without retry
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index c29a4a81a7..cccf16ef72 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@@ -92,7 +93,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
      * @throws ExecuteException if error throws ExecuteException
      */
     @Override
-    public Boolean execute(ExecutionContext context) throws ExecuteException {
+    public void execute(ExecutionContext context) throws ExecuteException {
         // all nodes
         Set<String> allNodes = getAllNodes(context);
         // fail nodes
@@ -101,23 +102,22 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
         Command command = context.getCommand();
         // execute task host
         Host host = context.getHost();
-        boolean success = false;
-        while (!success) {
+        for (int i = 0; i < allNodes.size(); i++) {
             try {
                 doExecute(host, command);
-                success = true;
                 context.setHost(host);
                 // We set the host to taskInstance to avoid when the worker 
down, this taskInstance may not be
                 // failovered, due to the taskInstance's host
                 // is not belongs to the down worker ISSUE-10842.
                 context.getTaskInstance().setHost(host.getAddress());
+                return;
             } catch (ExecuteException ex) {
                 logger.error("Execute command {} error", command, ex);
                 try {
                     failNodeSet.add(host.getAddress());
                     Set<String> tmpAllIps = new HashSet<>(allNodes);
                     Collection<String> remained = 
CollectionUtils.subtract(tmpAllIps, failNodeSet);
-                    if (remained != null && remained.size() > 0) {
+                    if (CollectionUtils.isNotEmpty(remained)) {
                         host = Host.of(remained.iterator().next());
                         logger.error("retry execute command : {} host : {}", 
command, host);
                     } else {
@@ -128,8 +128,6 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
                 }
             }
         }
-
-        return success;
     }
 
     @Override
@@ -171,7 +169,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
      * @param context context
      * @return nodes
      */
-    private Set<String> getAllNodes(ExecutionContext context) {
+    private Set<String> getAllNodes(ExecutionContext context) throws 
WorkerGroupNotFoundException {
         Set<String> nodes = Collections.emptySet();
         /**
          * executor type
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 160d611641..dd9b78fe37 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -20,6 +20,7 @@ package 
org.apache.dolphinscheduler.server.master.dispatch.host;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 
@@ -48,9 +49,10 @@ public abstract class CommonHostManager implements 
HostManager {
      *
      * @param context context
      * @return host
+     * @throws WorkerGroupNotFoundException If the worker group not found
      */
     @Override
-    public Host select(ExecutionContext context) {
+    public Host select(ExecutionContext context) throws 
WorkerGroupNotFoundException {
         List<HostWorker> candidates = null;
         String workerGroup = context.getWorkerGroup();
         ExecutorType executorType = context.getExecutorType();
@@ -72,7 +74,7 @@ public abstract class CommonHostManager implements 
HostManager {
 
     protected abstract HostWorker select(Collection<HostWorker> nodes);
 
-    protected List<HostWorker> getWorkerCandidates(String workerGroup) {
+    protected List<HostWorker> getWorkerCandidates(String workerGroup) throws 
WorkerGroupNotFoundException {
         List<HostWorker> hostWorkers = new ArrayList<>();
         Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup);
         if (CollectionUtils.isNotEmpty(nodes)) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
index fccc31b648..31de26bdb9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
@@ -19,6 +19,7 @@ package 
org.apache.dolphinscheduler.server.master.dispatch.host;
 
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 
 /**
  *  host manager
@@ -26,10 +27,12 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
 public interface HostManager {
 
     /**
-     *  select host
+     * select host
+     *
      * @param context context
      * @return host
+     * @throws WorkerGroupNotFoundException If the worker group does exist
      */
-    Host select(ExecutionContext context);
+    Host select(ExecutionContext context) throws WorkerGroupNotFoundException;
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 727d62587d..a6d920a61c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
 import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
 import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
@@ -35,8 +36,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.PostConstruct;
 
@@ -60,16 +60,15 @@ public class LowerWeightHostManager extends 
CommonHostManager {
      */
     private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeightsMap;
 
-    /**
-     * worker group host lock
-     */
-    private Lock lock;
+    private final ReentrantReadWriteLock workerGroupLock = new 
ReentrantReadWriteLock();
+
+    private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = 
workerGroupLock.readLock();
+    private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = 
workerGroupLock.writeLock();
 
     @PostConstruct
     public void init() {
         this.selector = new LowerWeightRoundRobin();
         this.workerHostWeightsMap = new ConcurrentHashMap<>();
-        this.lock = new ReentrantLock();
         serverNodeManager.addWorkerInfoChangeListener(new 
WorkerWeightListener());
     }
 
@@ -78,9 +77,10 @@ public class LowerWeightHostManager extends 
CommonHostManager {
      *
      * @param context context
      * @return host
+     * @throws WorkerGroupNotFoundException If the worker group not found
      */
     @Override
-    public Host select(ExecutionContext context) {
+    public Host select(ExecutionContext context) throws 
WorkerGroupNotFoundException {
         Set<HostWeight> workerHostWeights = 
getWorkerHostWeights(context.getWorkerGroup());
         if (CollectionUtils.isNotEmpty(workerHostWeights)) {
             return selector.select(workerHostWeights).getHost();
@@ -130,12 +130,12 @@ public class LowerWeightHostManager extends 
CommonHostManager {
         }
 
         private void syncWorkerHostWeight(Map<String, Set<HostWeight>> 
workerHostWeights) {
-            lock.lock();
+            workerGroupWriteLock.lock();
             try {
                 workerHostWeightsMap.clear();
                 workerHostWeightsMap.putAll(workerHostWeights);
             } finally {
-                lock.unlock();
+                workerGroupWriteLock.unlock();
             }
         }
     }
@@ -165,12 +165,16 @@ public class LowerWeightHostManager extends 
CommonHostManager {
                         heartBeat.getStartupTime()));
     }
 
-    private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
-        lock.lock();
+    private Set<HostWeight> getWorkerHostWeights(String workerGroup) throws 
WorkerGroupNotFoundException {
+        workerGroupReadLock.lock();
         try {
-            return workerHostWeightsMap.get(workerGroup);
+            Set<HostWeight> hostWeights = 
workerHostWeightsMap.get(workerGroup);
+            if (hostWeights == null) {
+                throw new WorkerGroupNotFoundException("Can not find worker 
group " + workerGroup);
+            }
+            return hostWeights;
         } finally {
-            lock.unlock();
+            workerGroupReadLock.unlock();
         }
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index a185037ae4..1d4b221313 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -36,6 +36,8 @@ import java.util.Optional;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import io.netty.channel.Channel;
+
 @Component
 public class TaskResultEventHandler implements TaskEventHandler {
 
@@ -113,13 +115,17 @@ public class TaskResultEventHandler implements 
TaskEventHandler {
     }
 
     public void sendAckToWorker(TaskEvent taskEvent) {
+        Channel channel = taskEvent.getChannel();
+        if (channel == null) {
+            return;
+        }
         // we didn't set the receiver address, since the ack doen's need to 
retry
         TaskExecuteAckCommand taskExecuteAckMessage = new 
TaskExecuteAckCommand(true,
                 taskEvent.getTaskInstanceId(),
                 masterConfig.getMasterAddress(),
                 taskEvent.getWorkerAddress(),
                 System.currentTimeMillis());
-        
taskEvent.getChannel().writeAndFlush(taskExecuteAckMessage.convert2Command());
+        channel.writeAndFlush(taskExecuteAckMessage.convert2Command());
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 47248a4045..2573707ec0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -26,13 +26,16 @@ import 
org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
 
 import java.util.Date;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import io.netty.channel.Channel;
 
-/**
- * task event
- */
 @Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class TaskEvent {
 
     /**
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 0a33410f2e..f1b84fc8c8 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -18,10 +18,10 @@
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
 import 
org.apache.dolphinscheduler.server.master.event.TaskEventHandleException;
 import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -53,7 +53,7 @@ public class TaskExecuteRunnable implements Runnable {
             // we handle the task event belongs to one task serial, so if the 
event comes in wrong order,
             TaskEvent event = this.events.peek();
             try {
-                
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), 
event.getTaskInstanceId());
+                
LogUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), 
event.getTaskInstanceId());
                 logger.info("Handle task event begin: {}", event);
                 
taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
                 events.remove(event);
@@ -71,7 +71,7 @@ public class TaskExecuteRunnable implements Runnable {
                         event, unknownException);
                 events.remove(event);
             } finally {
-                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         }
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index adac739b06..c6151866c0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -34,6 +34,7 @@ import 
org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -334,13 +335,16 @@ public class ServerNodeManager implements 
InitializingBean {
      * @param workerGroup workerGroup
      * @return worker nodes
      */
-    public Set<String> getWorkerGroupNodes(String workerGroup) {
+    public Set<String> getWorkerGroupNodes(String workerGroup) throws 
WorkerGroupNotFoundException {
         workerGroupReadLock.lock();
         try {
             if (StringUtils.isEmpty(workerGroup)) {
                 workerGroup = Constants.DEFAULT_WORKER_GROUP;
             }
             Set<String> nodes = workerGroupNodes.get(workerGroup);
+            if (nodes == null) {
+                throw new 
WorkerGroupNotFoundException(String.format("WorkerGroup: %s is invalidated", 
workerGroup));
+            }
             if (CollectionUtils.isEmpty(nodes)) {
                 return Collections.emptySet();
             }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 690198e9d6..d4761b04a9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -165,7 +165,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
                 taskExecutionContext.getWorkerGroup(), taskInstance);
         Boolean dispatchSuccess = false;
         try {
-            dispatchSuccess = dispatcher.dispatch(executionContext);
+            dispatcher.dispatch(executionContext);
+            dispatchSuccess = true;
         } catch (ExecuteException e) {
             logger.error("Master dispatch task to worker error, 
taskInstanceId: {}, worker: {}",
                     taskInstance.getId(),
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 0af8f73e7b..135e7d0073 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
@@ -310,7 +311,12 @@ public class TaskPriorityQueueConsumerTest {
 
         TaskPriority taskPriority = new TaskPriority();
         taskPriority.setTaskId(1);
-        boolean res = taskPriorityQueueConsumer.dispatchTask(taskPriority);
+        boolean res = false;
+        try {
+            taskPriorityQueueConsumer.dispatchTask(taskPriority);
+        } catch (ExecuteException e) {
+            throw new RuntimeException(e);
+        }
 
         Assertions.assertFalse(res);
     }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index 9fa680fb24..f5be1ed86f 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -72,13 +72,12 @@ public class NettyExecutorManagerTest {
                 .create();
         ExecutionContext executionContext = new 
ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
         
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
-        Boolean execute = nettyExecutorManager.execute(executionContext);
-        Assertions.assertTrue(execute);
+        Assertions.assertDoesNotThrow(() -> 
nettyExecutorManager.execute(executionContext));
         nettyRemotingServer.close();
     }
 
     @Test
-    public void testExecuteWithException() throws ExecuteException {
+    public void testExecuteWithException() {
         TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
         ProcessDefinition processDefinition = 
Mockito.mock(ProcessDefinition.class);
         ProcessInstance processInstance = new ProcessInstance();
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
index 69cab57c88..c2146928be 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
@@ -21,6 +21,7 @@ import 
org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUtils;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 
 import java.util.Optional;
@@ -49,7 +50,7 @@ public class RoundRobinHostManagerTest {
     RoundRobinHostManager roundRobinHostManager;
 
     @Test
-    public void testSelectWithEmptyResult() {
+    public void testSelectWithEmptyResult() throws 
WorkerGroupNotFoundException {
         
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(null);
         ExecutionContext context = 
ExecutionContextTestUtils.getExecutionContext(10000);
         Host emptyHost = roundRobinHostManager.select(context);
@@ -57,7 +58,7 @@ public class RoundRobinHostManagerTest {
     }
 
     @Test
-    public void testSelectWithResult() {
+    public void testSelectWithResult() throws WorkerGroupNotFoundException {
         
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22"));
         Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22"))
                 .thenReturn(Optional.of(new WorkerHeartBeat()));


Reply via email to