This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ad2646ff1f Fix TaskProcessorFactory#getTaskProcessor get common
processor is not thread safe (#10479)
ad2646ff1f is described below
commit ad2646ff1f7baa5d76d29023ced2c28a89b52f6b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jun 16 21:46:18 2022 +0800
Fix TaskProcessorFactory#getTaskProcessor get common processor is not
thread safe (#10479)
* Fix TaskProcessorFactory#getTaskProcessor get common processor is not
thread safe
---
.../dolphinscheduler/common/utils/FileUtils.java | 29 +++++-
.../dao/entity/ProcessInstance.java | 12 +--
.../master/consumer/TaskPriorityQueueConsumer.java | 8 +-
.../server/master/dispatch/ExecutorDispatcher.java | 2 -
.../server/master/processor/queue/TaskEvent.java | 106 +--------------------
.../processor/queue/TaskExecuteThreadPool.java | 22 +++--
.../master/runner/WorkflowExecuteRunnable.java | 66 +++++++------
.../master/runner/WorkflowExecuteThreadPool.java | 2 +-
.../master/runner/task/CommonTaskProcessor.java | 4 +-
.../master/runner/task/TaskProcessorFactory.java | 23 +++--
.../runner/task/TaskProcessorFactoryTest.java | 4 +-
.../remote/utils/ChannelUtils.java | 12 ++-
.../apache/dolphinscheduler/remote/utils/Host.java | 2 +
.../service/process/ProcessServiceImpl.java | 5 +-
.../queue/PeerTaskInstancePriorityQueue.java | 35 +++----
.../queue/PeerTaskInstancePriorityQueueTest.java | 9 +-
.../src/main/resources/application.yaml | 4 +-
.../plugin/task/shell/ShellTask.java | 7 +-
.../server/worker/config/WorkerConfig.java | 4 +-
.../worker/processor/TaskCallbackService.java | 2 +
.../server/worker/runner/TaskExecuteThread.java | 28 ++++--
21 files changed, 173 insertions(+), 213 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index bdcf62f76a..23e4b74b75 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -17,14 +17,25 @@
package org.apache.dolphinscheduler.common.utils;
+import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+import static
org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES;
+import static
org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
+import static org.apache.dolphinscheduler.common.Constants.UTF_8;
+import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
+
import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.nio.file.NoSuchFileException;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* file utils
@@ -112,7 +123,15 @@ public class FileUtils {
File execLocalPathFile = new File(execLocalPath);
if (execLocalPathFile.exists()) {
- org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
+ try {
+ org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
+ } catch (Exception ex) {
+ if (ex instanceof NoSuchFileException || ex.getCause()
instanceof NoSuchFileException) {
+ // this file is already be deleted.
+ } else {
+ throw ex;
+ }
+ }
}
//create work dir
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 29971f94db..054df92e4f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -265,13 +265,11 @@ public class ProcessInstance {
*/
public ProcessInstance(ProcessDefinition processDefinition) {
this.processDefinition = processDefinition;
- this.name = processDefinition.getName()
- + "-"
- +
- processDefinition.getVersion()
- + "-"
- +
- DateUtils.getCurrentTimeStamp();
+ // todo: the name is not unique
+ this.name = String.join("-",
+ processDefinition.getName(),
+ String.valueOf(processDefinition.getVersion()),
+ DateUtils.getCurrentTimeStamp());
}
public String getVarPool() {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index c1b54df62f..d75595de1a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -190,16 +190,18 @@ public class TaskPriorityQueueConsumer extends Thread {
return true;
}
}
-
result = dispatcher.dispatch(executionContext);
if (result) {
+ logger.info("Master success dispatch task to worker,
taskInstanceId: {}", taskPriority.getTaskId());
addDispatchEvent(context, executionContext);
+ } else {
+ logger.info("Master failed to dispatch task to worker,
taskInstanceId: {}", taskPriority.getTaskId());
}
} catch (RuntimeException e) {
- logger.error("dispatch error: ", e);
+ logger.error("Master dispatch task to worker error: ", e);
} catch (ExecuteException e) {
- logger.error("dispatch error: {}", e.getMessage());
+ logger.error("Master dispatch task to worker error: {}", e);
}
return result;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 25db4eb8a6..d439c28267 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.master.dispatch;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 3ed41329fe..842bcaf333 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -27,10 +27,12 @@ import
org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
import io.netty.channel.Channel;
+import lombok.Data;
/**
* task event
*/
+@Data
public class TaskEvent {
/**
@@ -144,108 +146,4 @@ public class TaskEvent {
event.setEvent(Event.WORKER_REJECT);
return event;
}
-
- public String getVarPool() {
- return varPool;
- }
-
- public void setVarPool(String varPool) {
- this.varPool = varPool;
- }
-
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public String getWorkerAddress() {
- return workerAddress;
- }
-
- public void setWorkerAddress(String workerAddress) {
- this.workerAddress = workerAddress;
- }
-
- public ExecutionStatus getState() {
- return state;
- }
-
- public void setState(ExecutionStatus state) {
- this.state = state;
- }
-
- public Date getStartTime() {
- return startTime;
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public Date getEndTime() {
- return endTime;
- }
-
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- public String getExecutePath() {
- return executePath;
- }
-
- public void setExecutePath(String executePath) {
- this.executePath = executePath;
- }
-
- public String getLogPath() {
- return logPath;
- }
-
- public void setLogPath(String logPath) {
- this.logPath = logPath;
- }
-
- public int getProcessId() {
- return processId;
- }
-
- public void setProcessId(int processId) {
- this.processId = processId;
- }
-
- public String getAppIds() {
- return appIds;
- }
-
- public void setAppIds(String appIds) {
- this.appIds = appIds;
- }
-
- public Event getEvent() {
- return event;
- }
-
- public void setEvent(Event event) {
- this.event = event;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public void setChannel(Channel channel) {
- this.channel = channel;
- }
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public void setProcessInstanceId(int processInstanceId) {
- this.processInstanceId = processInstanceId;
- }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index 6bf3f862ae..323ea86411 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -79,7 +79,7 @@ public class TaskExecuteThreadPool extends
ThreadPoolTaskExecutor {
public void submitTaskEvent(TaskEvent taskEvent) {
if
(!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
- logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
+ logger.warn("Cannot find workflowExecuteThread from cacheManager,
event: {}", taskEvent);
return;
}
if
(!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
@@ -114,20 +114,24 @@ public class TaskExecuteThreadPool extends
ThreadPoolTaskExecutor {
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
- logger.error("handle event {} failed: {}",
taskExecuteThread.getProcessInstanceId(), ex);
- if
(!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId()))
{
-
taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
- logger.info("remove process instance: {}",
taskExecuteThread.getProcessInstanceId());
+ Integer processInstanceId =
taskExecuteThread.getProcessInstanceId();
+ logger.error("persist event failed processInstanceId: {}",
processInstanceId, ex);
+ if
(!processInstanceExecCacheManager.contains(processInstanceId)) {
+ taskExecuteThreadMap.remove(processInstanceId);
+ logger.info("Cannot find processInstance from
cacheManager, remove process instance from threadMap: {}",
+ processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
@Override
public void onSuccess(Object result) {
- logger.info("persist events {} succeeded.",
taskExecuteThread.getProcessInstanceId());
- if
(!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId()))
{
-
taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
- logger.info("remove process instance: {}",
taskExecuteThread.getProcessInstanceId());
+ Integer processInstanceId =
taskExecuteThread.getProcessInstanceId();
+ logger.info("persist events succeeded, processInstanceId: {}",
processInstanceId);
+ if
(!processInstanceExecCacheManager.contains(processInstanceId)) {
+ taskExecuteThreadMap.remove(processInstanceId);
+ logger.info("Cannot find processInstance from
cacheManager, remove process instance from threadMap: {}",
+ processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 3365e67c35..ae627c44f5 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -17,10 +17,19 @@
package org.apache.dolphinscheduler.server.master.runner;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import static
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -69,8 +78,10 @@ import
org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -88,18 +99,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
-import static
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
/**
* Workflow execute task, used to execute a workflow instance.
@@ -1001,7 +1004,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* @param taskInstance task instance
* @return TaskInstance
*/
- private TaskInstance submitTaskExec(TaskInstance taskInstance) {
+ private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
try {
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
@@ -1019,7 +1022,7 @@ public class WorkflowExecuteRunnable implements Runnable {
logger.error("process id:{} name:{} submit standby task id:{}
name:{} failed!",
processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
- return null;
+ return Optional.empty();
}
// in a dag, only one taskInstance is valid per taskCode, so need
to set the old taskInstance invalid
@@ -1058,10 +1061,10 @@ public class WorkflowExecuteRunnable implements
Runnable {
taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(taskStateChangeEvent);
}
- return taskInstance;
+ return Optional.of(taskInstance);
} catch (Exception e) {
logger.error("submit standby task error", e);
- return null;
+ return Optional.empty();
}
}
@@ -1360,6 +1363,7 @@ public class WorkflowExecuteRunnable implements Runnable {
for (TaskInstance task : taskInstances) {
if (readyToSubmitTaskQueue.contains(task)) {
+ logger.warn("Task is already at submit queue, taskInstanceId:
{}", task.getId());
continue;
}
@@ -1764,9 +1768,6 @@ public class WorkflowExecuteRunnable implements Runnable {
* @param taskInstance task instance
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
- logger.info("remove task from stand by list, id: {} name:{}",
- taskInstance.getId(),
- taskInstance.getName());
try {
readyToSubmitTaskQueue.remove(taskInstance);
} catch (Exception e) {
@@ -1859,14 +1860,14 @@ public class WorkflowExecuteRunnable implements
Runnable {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
- TaskInstance taskInstance = submitTaskExec(task);
- if (taskInstance == null) {
+ Optional<TaskInstance> taskInstanceOptional =
submitTaskExec(task);
+ if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
// Remove and add to complete map and error map
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
errorTaskMap.put(task.getTaskCode(), task.getId());
- logger.error("process {}, task {}, code:{} submit task
failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode());
+ logger.error("Task submitted failed,
processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(),
task.getId());
} else {
removeTaskFromStandbyList(task);
}
@@ -1874,11 +1875,11 @@ public class WorkflowExecuteRunnable implements
Runnable {
// if the dependency fails, the current node is not
submitted and the state changes to failure.
dependFailedTaskMap.put(task.getTaskCode(), task.getId());
removeTaskFromStandbyList(task);
- logger.info("task {},id:{} depend result : {}",
task.getName(), task.getId(), dependResult);
+ logger.info("Task dependent result is failed,
taskInstanceId:{} depend result : {}", task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
// for some reasons(depend task pause/stop) this task
would not be submit
removeTaskFromStandbyList(task);
- logger.info("remove task {},id:{} , because depend result
: {}", task.getName(), task.getId(), dependResult);
+ logger.info("Remove task due to depend result not
executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
}
}
} catch (Exception e) {
@@ -2067,6 +2068,11 @@ public class WorkflowExecuteRunnable implements Runnable
{
}
private void measureTaskState(StateEvent taskStateEvent) {
+ if (taskStateEvent == null || taskStateEvent.getExecutionStatus() ==
null) {
+ // the event is broken
+ logger.warn("The task event is broken..., taskEvent: {}",
taskStateEvent);
+ return;
+ }
if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 849c3a23b3..b658f669f7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -105,7 +105,7 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
/**
* Handle the events belong to the given workflow.
*/
- public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
+ public void executeEvent(final WorkflowExecuteRunnable
workflowExecuteThread) {
if (!workflowExecuteThread.isStart() ||
workflowExecuteThread.eventSize() == 0) {
return;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index fd803ac371..fcfe7c67bf 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -119,7 +119,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is
already running or delayed.", taskInstance.getName());
return true;
}
- logger.debug("task ready to submit: {}", taskInstance.getName());
+ logger.info("task ready to submit: taskInstanceId: {}",
taskInstance.getId());
TaskPriority taskPriority = new
TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
@@ -134,7 +134,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
- logger.info("master submit success, task : {}",
taskInstance.getName());
+ logger.info("Master submit task to priority queue success,
taskInstanceId : {}", taskInstance.getId());
return true;
} catch (Exception e) {
logger.error("submit task error", e);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 542697a92f..41c2bd56d3 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -21,8 +21,9 @@ import static
org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import org.apache.commons.lang3.StringUtils;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
-import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
@@ -36,27 +37,31 @@ public final class TaskProcessorFactory {
private static final Logger logger =
LoggerFactory.getLogger(TaskProcessorFactory.class);
- public static final Map<String, ITaskProcessor> PROCESS_MAP = new
ConcurrentHashMap<>();
+ public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP =
new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
static {
for (ITaskProcessor iTaskProcessor :
ServiceLoader.load(ITaskProcessor.class)) {
- PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor);
+ try {
+ PROCESS_MAP.put(iTaskProcessor.getType(),
(Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("The task processor should
has a no args constructor");
+ }
}
}
- public static ITaskProcessor getTaskProcessor(String type) throws
InstantiationException, IllegalAccessException {
+ public static ITaskProcessor getTaskProcessor(String type) throws
InvocationTargetException, InstantiationException, IllegalAccessException {
if (StringUtils.isEmpty(type)) {
type = DEFAULT_PROCESSOR;
}
- ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type);
- if (Objects.isNull(iTaskProcessor)) {
- logger.warn("task processor not found for type: {}", type);
- return PROCESS_MAP.get(DEFAULT_PROCESSOR);
+ Constructor<ITaskProcessor> iTaskProcessorConstructor =
PROCESS_MAP.get(type);
+ if (iTaskProcessorConstructor == null) {
+ logger.warn("ITaskProcessor could not found for taskType: {}",
type);
+ iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
}
- return iTaskProcessor.getClass().newInstance();
+ return iTaskProcessorConstructor.newInstance();
}
/**
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index d0371809cc..b974a40b31 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import java.lang.reflect.InvocationTargetException;
+
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
public class TaskProcessorFactoryTest {
@Test
- public void testFactory() throws InstantiationException,
IllegalAccessException {
+ public void testFactory() throws InvocationTargetException,
InstantiationException, IllegalAccessException {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index 239a3993c0..b4177ec25d 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.net.InetSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.channel.Channel;
/**
@@ -28,6 +31,8 @@ import io.netty.channel.Channel;
*/
public class ChannelUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ChannelUtils.class);
+
private ChannelUtils() {
throw new IllegalStateException(ChannelUtils.class.getName());
}
@@ -49,7 +54,7 @@ public class ChannelUtils {
* @return remote address
*/
public static String getRemoteAddress(Channel channel) {
- return NetUtils.getHost(((InetSocketAddress)
channel.remoteAddress()).getAddress());
+ return toAddress(channel).getAddress();
}
/**
@@ -60,6 +65,11 @@ public class ChannelUtils {
*/
public static Host toAddress(Channel channel) {
InetSocketAddress socketAddress = ((InetSocketAddress)
channel.remoteAddress());
+ if (socketAddress == null) {
+ // the remote channel already closed
+ LOGGER.warn("The channel is already closed");
+ return Host.EMPTY;
+ }
return new Host(NetUtils.getHost(socketAddress.getAddress()),
socketAddress.getPort());
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index 2163e9c7d8..dc8e1f0d36 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -27,6 +27,8 @@ import java.util.Objects;
*/
public class Host implements Serializable {
+ public static final Host EMPTY = new Host();
+
/**
* address
*/
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 3760bde2b6..b42e033b20 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -766,7 +766,8 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
- processInstance.setRestartTime(processInstance.getStartTime());
+ // the new process instance restart time is null.
+ processInstance.setRestartTime(null);
processInstance.setRunTimes(1);
processInstance.setMaxTryTimes(0);
processInstance.setCommandParam(command.getCommandParam());
@@ -1285,7 +1286,7 @@ public class ProcessServiceImpl implements ProcessService
{
@Override
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(ProcessInstance processInstance,
TaskInstance taskInstance) {
- logger.info("start submit task : {}, instance id:{}, state: {}",
+ logger.info("start submit task : {}, processInstance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(),
processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance,
processInstance);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index 231fd2a20f..2e939ee332 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import
org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
/**
* Task instances priority queue implementation
@@ -40,12 +42,8 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
/**
* queue
*/
- private PriorityQueue<TaskInstance> queue = new
PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
-
- /**
- * Lock used for all public operations
- */
- private final ReentrantLock lock = new ReentrantLock(true);
+ private final PriorityQueue<TaskInstance> queue = new
PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+ private final Set<Integer> taskInstanceIdSet =
Collections.synchronizedSet(new HashSet<>());
/**
* put task instance to priority queue
@@ -56,6 +54,7 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
@Override
public void put(TaskInstance taskInstance) throws
TaskPriorityQueueException {
queue.add(taskInstance);
+ taskInstanceIdSet.add(taskInstance.getId());
}
/**
@@ -66,7 +65,11 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
*/
@Override
public TaskInstance take() throws TaskPriorityQueueException {
- return queue.poll();
+ TaskInstance taskInstance = queue.poll();
+ if (taskInstance != null) {
+ taskInstanceIdSet.remove(taskInstance.getId());
+ }
+ return taskInstance;
}
/**
@@ -111,6 +114,7 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
*/
public void clear() {
queue.clear();
+ taskInstanceIdSet.clear();
}
/**
@@ -120,20 +124,11 @@ public class PeerTaskInstancePriorityQueue implements
TaskPriorityQueue<TaskInst
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
- return this.contains(taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
+ return this.contains(taskInstance.getId());
}
- public boolean contains(long taskCode, int taskVersion) {
- Iterator<TaskInstance> iterator = this.queue.iterator();
- while (iterator.hasNext()) {
- TaskInstance taskInstance = iterator.next();
- if (taskCode == taskInstance.getTaskCode()
- && taskVersion == taskInstance.getTaskDefinitionVersion())
{
- return true;
- }
- }
- return false;
-
+ public boolean contains(int taskInstanceId) {
+ return taskInstanceIdSet.contains(taskInstanceId);
}
/**
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
index 8da3a6c194..67e40d1189 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
@@ -46,14 +46,11 @@ public class PeerTaskInstancePriorityQueueTest {
Assert.assertTrue(queue.size() < peekBeforeLength);
}
- @Test
+
+ @Test(expected = TaskPriorityQueueException.class)
public void poll() throws Exception {
PeerTaskInstancePriorityQueue queue =
getPeerTaskInstancePriorityQueue();
- try {
- queue.poll(1000, TimeUnit.MILLISECONDS);
- } catch (TaskPriorityQueueException e) {
- e.printStackTrace();
- }
+ queue.poll(1000, TimeUnit.MILLISECONDS);
}
@Test
diff --git
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index d968090e46..945cae26bb 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -109,7 +109,7 @@ master:
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
- exec-threads: 100
+ exec-threads: 10
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value:
LowerWeight. Optional values include random, round_robin, lower_weight
@@ -134,7 +134,7 @@ worker:
# worker listener port
listen-port: 1234
# worker execute thread number to limit task instances in parallel
- exec-threads: 100
+ exec-threads: 10
# worker heartbeat interval, the unit is second
heartbeat-interval: 10
# worker host weight to dispatch tasks, default value 100
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 94a3ffe43a..441deb495b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -34,6 +34,7 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@@ -147,7 +148,11 @@ public class ShellTask extends AbstractTaskExecutor {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
- Files.createFile(path, attr);
+ try {
+ Files.createFile(path, attr);
+ } catch (FileAlreadyExistsException ex) {
+ // this is expected
+ }
}
Files.write(path, shellParameters.getRawScript().getBytes(),
StandardOpenOption.APPEND);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 3eb112bb07..059aa64ce6 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -21,9 +21,9 @@ import java.util.Set;
import org.springframework.boot.context.properties.ConfigurationProperties;
import
org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.stereotype.Component;
+import org.springframework.context.annotation.Configuration;
-@Component
+@Configuration
@EnableConfigurationProperties
@ConfigurationProperties("worker")
public class WorkerConfig {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 448f62c8c2..8264ea52f0 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -163,6 +163,8 @@ public class TaskCallbackService {
}
}
});
+ } else {
+ logger.warn("Remote channel of taskInstanceId is null: {}, cannot
send command: {}", taskInstanceId, command);
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 48c590cbd7..df75caa3bb 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import
org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -39,17 +42,26 @@ import
org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import com.google.common.base.Strings;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Strings;
+
/**
* task scheduler thread
*/
@@ -232,7 +244,11 @@ public class TaskExecuteThread implements Runnable,
Delayed {
org.apache.commons.io.FileUtils.deleteDirectory(new
File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
- logger.error("delete exec dir failed : {}", e.getMessage(), e);
+ if (e instanceof NoSuchFileException) {
+ // this is expected
+ } else {
+ logger.error("Delete exec dir failed.", e);
+ }
}
}
}
@@ -263,7 +279,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
task.cancelApplication(true);
ProcessUtils.killYarnJob(taskExecutionContext);
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("Kill task failed", e);
}
}
}