This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ce34e21960 Task instance failure when worker group doesn't exist
(#13448)
ce34e21960 is described below
commit ce34e21960fa8b80769701dbd999382b0e9b9e27
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Jan 29 16:27:56 2023 +0800
Task instance failure when worker group doesn't exist (#13448)
---
.../master/consumer/TaskPriorityQueueConsumer.java | 126 ++++++++++++---------
.../server/master/dispatch/ExecutorDispatcher.java | 6 +-
.../master/dispatch/context/ExecutionContext.java | 20 ++--
.../WorkerGroupNotFoundException.java} | 20 +---
.../master/dispatch/executor/ExecutorManager.java | 3 +-
.../dispatch/executor/NettyExecutorManager.java | 14 +--
.../master/dispatch/host/CommonHostManager.java | 6 +-
.../server/master/dispatch/host/HostManager.java | 7 +-
.../dispatch/host/LowerWeightHostManager.java | 32 +++---
.../master/event/TaskResultEventHandler.java | 8 +-
.../server/master/processor/queue/TaskEvent.java | 9 +-
.../processor/queue/TaskExecuteRunnable.java | 6 +-
.../server/master/registry/ServerNodeManager.java | 6 +-
.../master/runner/StreamTaskExecuteRunnable.java | 3 +-
.../consumer/TaskPriorityQueueConsumerTest.java | 8 +-
.../executor/NettyExecutorManagerTest.java | 5 +-
.../dispatch/host/RoundRobinHostManagerTest.java | 5 +-
17 files changed, 162 insertions(+), 122 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index db2b66a2b8..8af2f1e30a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -19,14 +19,17 @@ package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -35,6 +38,7 @@ import
org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -47,6 +51,7 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -171,8 +176,17 @@ public class TaskPriorityQueueConsumer extends
BaseDaemonThread {
consumerThreadPoolExecutor.submit(() -> {
try {
- boolean dispatchResult = this.dispatchTask(taskPriority);
- if (!dispatchResult) {
+ try {
+ this.dispatchTask(taskPriority);
+ } catch (WorkerGroupNotFoundException e) {
+ // If the worker group not found, will not try to
dispatch again.
+ // The task instance will be failed
+ // todo:
+ addDispatchFailedEvent(taskPriority);
+ } catch (ExecuteException e) {
+ failedDispatchTasks.add(taskPriority);
+ } catch (Exception e) {
+ logger.error("Dispatch task error, meet an unknown
exception", e);
failedDispatchTasks.add(taskPriority);
}
} finally {
@@ -193,60 +207,50 @@ public class TaskPriorityQueueConsumer extends
BaseDaemonThread {
* @param taskPriority taskPriority
* @return dispatch result, return true if dispatch success, return false
if dispatch failed.
*/
- protected boolean dispatchTask(TaskPriority taskPriority) {
+ protected void dispatchTask(TaskPriority taskPriority) throws
ExecuteException {
TaskMetrics.incTaskDispatch();
- boolean result = false;
- try {
- WorkflowExecuteRunnable workflowExecuteRunnable =
-
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
- if (workflowExecuteRunnable == null) {
- logger.error("Cannot find the related processInstance of the
task, taskPriority: {}", taskPriority);
- return true;
- }
- Optional<TaskInstance> taskInstanceOptional =
-
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
- if (!taskInstanceOptional.isPresent()) {
- logger.error("Cannot find the task instance from related
processInstance, taskPriority: {}",
- taskPriority);
- // we return true, so that we will drop this task.
- return true;
- }
- TaskInstance taskInstance = taskInstanceOptional.get();
- TaskExecutionContext context =
taskPriority.getTaskExecutionContext();
- ExecutionContext executionContext =
- new ExecutionContext(toCommand(context),
ExecutorType.WORKER, context.getWorkerGroup(),
- taskInstance);
-
- if (isTaskNeedToCheck(taskPriority)) {
- if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
- // when task finish, ignore this task, there is no need to
dispatch anymore
- logger.info("Task {} is already finished, no need to
dispatch, task instance id: {}",
- taskInstance.getName(), taskInstance.getId());
- return true;
- }
- }
-
- // check task is cache execution, and decide whether to dispatch
- if (checkIsCacheExecution(taskInstance, context)) {
- return true;
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
+ if (workflowExecuteRunnable == null) {
+ logger.error("Cannot find the related processInstance of the task,
taskPriority: {}", taskPriority);
+ return;
+ }
+ Optional<TaskInstance> taskInstanceOptional =
+
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
+ if (!taskInstanceOptional.isPresent()) {
+ logger.error("Cannot find the task instance from related
processInstance, taskPriority: {}",
+ taskPriority);
+ // we return true, so that we will drop this task.
+ return;
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ TaskExecutionContext context = taskPriority.getTaskExecutionContext();
+ ExecutionContext executionContext = ExecutionContext.builder()
+ .taskInstance(taskInstance)
+ .workerGroup(context.getWorkerGroup())
+ .executorType(ExecutorType.WORKER)
+ .command(toCommand(context))
+ .build();
+
+ if (isTaskNeedToCheck(taskPriority)) {
+ if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
+ // when task finish, ignore this task, there is no need to
dispatch anymore
+ logger.info("Task {} is already finished, no need to dispatch,
task instance id: {}",
+ taskInstance.getName(), taskInstance.getId());
+ return;
}
+ }
- result = dispatcher.dispatch(executionContext);
-
- if (result) {
- logger.info("Master success dispatch task to worker,
taskInstanceId: {}, worker: {}",
- taskPriority.getTaskId(),
- executionContext.getHost());
- addDispatchEvent(context, executionContext);
- } else {
- logger.info("Master failed to dispatch task to worker,
taskInstanceId: {}, worker: {}",
- taskPriority.getTaskId(),
- executionContext.getHost());
- }
- } catch (RuntimeException | ExecuteException e) {
- logger.error("Master dispatch task to worker error, taskPriority:
{}", taskPriority, e);
+ // check task is cache execution, and decide whether to dispatch
+ if (checkIsCacheExecution(taskInstance, context)) {
+ return;
}
- return result;
+
+ dispatcher.dispatch(executionContext);
+ logger.info("Master success dispatch task to worker, taskInstanceId:
{}, worker: {}",
+ taskPriority.getTaskId(),
+ executionContext.getHost());
+ addDispatchEvent(context, executionContext);
}
/**
@@ -258,6 +262,24 @@ public class TaskPriorityQueueConsumer extends
BaseDaemonThread {
taskEventService.addEvent(taskEvent);
}
+ private void addDispatchFailedEvent(TaskPriority taskPriority) {
+ TaskExecutionContext taskExecutionContext =
taskPriority.getTaskExecutionContext();
+ TaskEvent taskEvent = TaskEvent.builder()
+ .processInstanceId(taskPriority.getProcessInstanceId())
+ .taskInstanceId(taskPriority.getTaskId())
+ .state(TaskExecutionStatus.FAILURE)
+ .logPath(taskExecutionContext.getLogPath())
+ .executePath(taskExecutionContext.getExecutePath())
+ .appIds(taskExecutionContext.getAppIds())
+ .processId(taskExecutionContext.getProcessId())
+ .varPool(taskExecutionContext.getVarPool())
+
.startTime(DateUtils.timeStampToDate(taskExecutionContext.getStartTime()))
+ .endTime(new Date())
+ .event(TaskEventType.RESULT)
+ .build();
+ taskEventService.addEvent(taskEvent);
+ }
+
private Command toCommand(TaskExecutionContext taskExecutionContext) {
// todo: we didn't set the host here, since right now we didn't need
to retry this message.
TaskDispatchCommand requestCommand = new
TaskDispatchCommand(taskExecutionContext,
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 3b1a0d4839..90cb07fbb2 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -74,7 +74,7 @@ public class ExecutorDispatcher implements InitializingBean {
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
- public Boolean dispatch(final ExecutionContext context) throws
ExecuteException {
+ public void dispatch(final ExecutionContext context) throws
ExecuteException {
// get executor manager
ExecutorManager<Boolean> executorManager =
this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
@@ -86,13 +86,13 @@ public class ExecutorDispatcher implements InitializingBean
{
if (StringUtils.isEmpty(host.getAddress())) {
logger.warn("fail to execute : {} due to no suitable worker,
current task needs worker group {} to execute",
context.getCommand(), context.getWorkerGroup());
- return false;
+ throw new ExecuteException("no suitable worker");
}
context.setHost(host);
executorManager.beforeExecute(context);
try {
// task execute
- return executorManager.execute(context);
+ executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 567a961877..3294f04a01 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -24,12 +24,15 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
-/**
- * execution context
- */
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class ExecutionContext {
/**
@@ -40,19 +43,16 @@ public class ExecutionContext {
/**
* command
*/
- private final Command command;
+ private Command command;
- private final TaskInstance taskInstance;
+ private TaskInstance taskInstance;
- /**
- * executor type : worker or client
- */
- private final ExecutorType executorType;
+ private ExecutorType executorType;
/**
* worker group
*/
- private final String workerGroup;
+ private String workerGroup;
public ExecutionContext(Command command, ExecutorType executorType,
TaskInstance taskInstance) {
this(command, executorType, DEFAULT_WORKER_GROUP, taskInstance);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/WorkerGroupNotFoundException.java
similarity index 67%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/WorkerGroupNotFoundException.java
index fccc31b648..81755152c3 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/WorkerGroupNotFoundException.java
@@ -15,21 +15,11 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.dispatch.host;
+package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-
-/**
- * host manager
- */
-public interface HostManager {
-
- /**
- * select host
- * @param context context
- * @return host
- */
- Host select(ExecutionContext context);
+public class WorkerGroupNotFoundException extends ExecuteException {
+ public WorkerGroupNotFoundException(String workerGroup) {
+ super("Cannot find worker group: " + workerGroup);
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 1e7754082c..9f1e02f151 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -35,11 +35,12 @@ public interface ExecutorManager<T> {
/**
* execute task
+ *
* @param context context
* @return T
* @throws ExecuteException if error throws ExecuteException
*/
- T execute(ExecutionContext context) throws ExecuteException;
+ void execute(ExecutionContext context) throws ExecuteException;
/**
* execute task directly without retry
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index c29a4a81a7..cccf16ef72 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@@ -92,7 +93,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
* @throws ExecuteException if error throws ExecuteException
*/
@Override
- public Boolean execute(ExecutionContext context) throws ExecuteException {
+ public void execute(ExecutionContext context) throws ExecuteException {
// all nodes
Set<String> allNodes = getAllNodes(context);
// fail nodes
@@ -101,23 +102,22 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
Command command = context.getCommand();
// execute task host
Host host = context.getHost();
- boolean success = false;
- while (!success) {
+ for (int i = 0; i < allNodes.size(); i++) {
try {
doExecute(host, command);
- success = true;
context.setHost(host);
// We set the host to taskInstance to avoid when the worker
down, this taskInstance may not be
// failovered, due to the taskInstance's host
// is not belongs to the down worker ISSUE-10842.
context.getTaskInstance().setHost(host.getAddress());
+ return;
} catch (ExecuteException ex) {
logger.error("Execute command {} error", command, ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained =
CollectionUtils.subtract(tmpAllIps, failNodeSet);
- if (remained != null && remained.size() > 0) {
+ if (CollectionUtils.isNotEmpty(remained)) {
host = Host.of(remained.iterator().next());
logger.error("retry execute command : {} host : {}",
command, host);
} else {
@@ -128,8 +128,6 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
}
}
}
-
- return success;
}
@Override
@@ -171,7 +169,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
* @param context context
* @return nodes
*/
- private Set<String> getAllNodes(ExecutionContext context) {
+ private Set<String> getAllNodes(ExecutionContext context) throws
WorkerGroupNotFoundException {
Set<String> nodes = Collections.emptySet();
/**
* executor type
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 160d611641..dd9b78fe37 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -20,6 +20,7 @@ package
org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@@ -48,9 +49,10 @@ public abstract class CommonHostManager implements
HostManager {
*
* @param context context
* @return host
+ * @throws WorkerGroupNotFoundException If the worker group not found
*/
@Override
- public Host select(ExecutionContext context) {
+ public Host select(ExecutionContext context) throws
WorkerGroupNotFoundException {
List<HostWorker> candidates = null;
String workerGroup = context.getWorkerGroup();
ExecutorType executorType = context.getExecutorType();
@@ -72,7 +74,7 @@ public abstract class CommonHostManager implements
HostManager {
protected abstract HostWorker select(Collection<HostWorker> nodes);
- protected List<HostWorker> getWorkerCandidates(String workerGroup) {
+ protected List<HostWorker> getWorkerCandidates(String workerGroup) throws
WorkerGroupNotFoundException {
List<HostWorker> hostWorkers = new ArrayList<>();
Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup);
if (CollectionUtils.isNotEmpty(nodes)) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
index fccc31b648..31de26bdb9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
@@ -19,6 +19,7 @@ package
org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
/**
* host manager
@@ -26,10 +27,12 @@ import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
public interface HostManager {
/**
- * select host
+ * select host
+ *
* @param context context
* @return host
+ * @throws WorkerGroupNotFoundException If the worker group does exist
*/
- Host select(ExecutionContext context);
+ Host select(ExecutionContext context) throws WorkerGroupNotFoundException;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 727d62587d..a6d920a61c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
@@ -35,8 +36,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.PostConstruct;
@@ -60,16 +60,15 @@ public class LowerWeightHostManager extends
CommonHostManager {
*/
private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeightsMap;
- /**
- * worker group host lock
- */
- private Lock lock;
+ private final ReentrantReadWriteLock workerGroupLock = new
ReentrantReadWriteLock();
+
+ private final ReentrantReadWriteLock.ReadLock workerGroupReadLock =
workerGroupLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock =
workerGroupLock.writeLock();
@PostConstruct
public void init() {
this.selector = new LowerWeightRoundRobin();
this.workerHostWeightsMap = new ConcurrentHashMap<>();
- this.lock = new ReentrantLock();
serverNodeManager.addWorkerInfoChangeListener(new
WorkerWeightListener());
}
@@ -78,9 +77,10 @@ public class LowerWeightHostManager extends
CommonHostManager {
*
* @param context context
* @return host
+ * @throws WorkerGroupNotFoundException If the worker group not found
*/
@Override
- public Host select(ExecutionContext context) {
+ public Host select(ExecutionContext context) throws
WorkerGroupNotFoundException {
Set<HostWeight> workerHostWeights =
getWorkerHostWeights(context.getWorkerGroup());
if (CollectionUtils.isNotEmpty(workerHostWeights)) {
return selector.select(workerHostWeights).getHost();
@@ -130,12 +130,12 @@ public class LowerWeightHostManager extends
CommonHostManager {
}
private void syncWorkerHostWeight(Map<String, Set<HostWeight>>
workerHostWeights) {
- lock.lock();
+ workerGroupWriteLock.lock();
try {
workerHostWeightsMap.clear();
workerHostWeightsMap.putAll(workerHostWeights);
} finally {
- lock.unlock();
+ workerGroupWriteLock.unlock();
}
}
}
@@ -165,12 +165,16 @@ public class LowerWeightHostManager extends
CommonHostManager {
heartBeat.getStartupTime()));
}
- private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
- lock.lock();
+ private Set<HostWeight> getWorkerHostWeights(String workerGroup) throws
WorkerGroupNotFoundException {
+ workerGroupReadLock.lock();
try {
- return workerHostWeightsMap.get(workerGroup);
+ Set<HostWeight> hostWeights =
workerHostWeightsMap.get(workerGroup);
+ if (hostWeights == null) {
+ throw new WorkerGroupNotFoundException("Can not find worker
group " + workerGroup);
+ }
+ return hostWeights;
} finally {
- lock.unlock();
+ workerGroupReadLock.unlock();
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index a185037ae4..1d4b221313 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -36,6 +36,8 @@ import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import io.netty.channel.Channel;
+
@Component
public class TaskResultEventHandler implements TaskEventHandler {
@@ -113,13 +115,17 @@ public class TaskResultEventHandler implements
TaskEventHandler {
}
public void sendAckToWorker(TaskEvent taskEvent) {
+ Channel channel = taskEvent.getChannel();
+ if (channel == null) {
+ return;
+ }
// we didn't set the receiver address, since the ack doen's need to
retry
TaskExecuteAckCommand taskExecuteAckMessage = new
TaskExecuteAckCommand(true,
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
System.currentTimeMillis());
-
taskEvent.getChannel().writeAndFlush(taskExecuteAckMessage.convert2Command());
+ channel.writeAndFlush(taskExecuteAckMessage.convert2Command());
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 47248a4045..2573707ec0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -26,13 +26,16 @@ import
org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import java.util.Date;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import io.netty.channel.Channel;
-/**
- * task event
- */
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class TaskEvent {
/**
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 0a33410f2e..f1b84fc8c8 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -18,10 +18,10 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
import
org.apache.dolphinscheduler.server.master.event.TaskEventHandleException;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -53,7 +53,7 @@ public class TaskExecuteRunnable implements Runnable {
// we handle the task event belongs to one task serial, so if the
event comes in wrong order,
TaskEvent event = this.events.peek();
try {
-
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(),
event.getTaskInstanceId());
+
LogUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(),
event.getTaskInstanceId());
logger.info("Handle task event begin: {}", event);
taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
events.remove(event);
@@ -71,7 +71,7 @@ public class TaskExecuteRunnable implements Runnable {
event, unknownException);
events.remove(event);
} finally {
- LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index adac739b06..c6151866c0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -34,6 +34,7 @@ import
org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.commons.collections4.CollectionUtils;
@@ -334,13 +335,16 @@ public class ServerNodeManager implements
InitializingBean {
* @param workerGroup workerGroup
* @return worker nodes
*/
- public Set<String> getWorkerGroupNodes(String workerGroup) {
+ public Set<String> getWorkerGroupNodes(String workerGroup) throws
WorkerGroupNotFoundException {
workerGroupReadLock.lock();
try {
if (StringUtils.isEmpty(workerGroup)) {
workerGroup = Constants.DEFAULT_WORKER_GROUP;
}
Set<String> nodes = workerGroupNodes.get(workerGroup);
+ if (nodes == null) {
+ throw new
WorkerGroupNotFoundException(String.format("WorkerGroup: %s is invalidated",
workerGroup));
+ }
if (CollectionUtils.isEmpty(nodes)) {
return Collections.emptySet();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 690198e9d6..d4761b04a9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -165,7 +165,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
taskExecutionContext.getWorkerGroup(), taskInstance);
Boolean dispatchSuccess = false;
try {
- dispatchSuccess = dispatcher.dispatch(executionContext);
+ dispatcher.dispatch(executionContext);
+ dispatchSuccess = true;
} catch (ExecuteException e) {
logger.error("Master dispatch task to worker error,
taskInstanceId: {}, worker: {}",
taskInstance.getId(),
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 0af8f73e7b..135e7d0073 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
@@ -310,7 +311,12 @@ public class TaskPriorityQueueConsumerTest {
TaskPriority taskPriority = new TaskPriority();
taskPriority.setTaskId(1);
- boolean res = taskPriorityQueueConsumer.dispatchTask(taskPriority);
+ boolean res = false;
+ try {
+ taskPriorityQueueConsumer.dispatchTask(taskPriority);
+ } catch (ExecuteException e) {
+ throw new RuntimeException(e);
+ }
Assertions.assertFalse(res);
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index 9fa680fb24..f5be1ed86f 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -72,13 +72,12 @@ public class NettyExecutorManagerTest {
.create();
ExecutionContext executionContext = new
ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
- Boolean execute = nettyExecutorManager.execute(executionContext);
- Assertions.assertTrue(execute);
+ Assertions.assertDoesNotThrow(() ->
nettyExecutorManager.execute(executionContext));
nettyRemotingServer.close();
}
@Test
- public void testExecuteWithException() throws ExecuteException {
+ public void testExecuteWithException() {
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
ProcessDefinition processDefinition =
Mockito.mock(ProcessDefinition.class);
ProcessInstance processInstance = new ProcessInstance();
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
index 69cab57c88..c2146928be 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
@@ -21,6 +21,7 @@ import
org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUtils;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import java.util.Optional;
@@ -49,7 +50,7 @@ public class RoundRobinHostManagerTest {
RoundRobinHostManager roundRobinHostManager;
@Test
- public void testSelectWithEmptyResult() {
+ public void testSelectWithEmptyResult() throws
WorkerGroupNotFoundException {
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(null);
ExecutionContext context =
ExecutionContextTestUtils.getExecutionContext(10000);
Host emptyHost = roundRobinHostManager.select(context);
@@ -57,7 +58,7 @@ public class RoundRobinHostManagerTest {
}
@Test
- public void testSelectWithResult() {
+ public void testSelectWithResult() throws WorkerGroupNotFoundException {
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22"));
Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22"))
.thenReturn(Optional.of(new WorkerHeartBeat()));