This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ad2646ff1f Fix TaskProcessorFactory#getTaskProcessor get common 
processor is not thread safe (#10479)
ad2646ff1f is described below

commit ad2646ff1f7baa5d76d29023ced2c28a89b52f6b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jun 16 21:46:18 2022 +0800

    Fix TaskProcessorFactory#getTaskProcessor get common processor is not 
thread safe (#10479)
    
    * Fix TaskProcessorFactory#getTaskProcessor get common processor is not 
thread safe
---
 .../dolphinscheduler/common/utils/FileUtils.java   |  29 +++++-
 .../dao/entity/ProcessInstance.java                |  12 +--
 .../master/consumer/TaskPriorityQueueConsumer.java |   8 +-
 .../server/master/dispatch/ExecutorDispatcher.java |   2 -
 .../server/master/processor/queue/TaskEvent.java   | 106 +--------------------
 .../processor/queue/TaskExecuteThreadPool.java     |  22 +++--
 .../master/runner/WorkflowExecuteRunnable.java     |  66 +++++++------
 .../master/runner/WorkflowExecuteThreadPool.java   |   2 +-
 .../master/runner/task/CommonTaskProcessor.java    |   4 +-
 .../master/runner/task/TaskProcessorFactory.java   |  23 +++--
 .../runner/task/TaskProcessorFactoryTest.java      |   4 +-
 .../remote/utils/ChannelUtils.java                 |  12 ++-
 .../apache/dolphinscheduler/remote/utils/Host.java |   2 +
 .../service/process/ProcessServiceImpl.java        |   5 +-
 .../queue/PeerTaskInstancePriorityQueue.java       |  35 +++----
 .../queue/PeerTaskInstancePriorityQueueTest.java   |   9 +-
 .../src/main/resources/application.yaml            |   4 +-
 .../plugin/task/shell/ShellTask.java               |   7 +-
 .../server/worker/config/WorkerConfig.java         |   4 +-
 .../worker/processor/TaskCallbackService.java      |   2 +
 .../server/worker/runner/TaskExecuteThread.java    |  28 ++++--
 21 files changed, 173 insertions(+), 213 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index bdcf62f76a..23e4b74b75 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -17,14 +17,25 @@
 
 package org.apache.dolphinscheduler.common.utils;
 
+import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+import static 
org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES;
+import static 
org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
+import static org.apache.dolphinscheduler.common.Constants.UTF_8;
+import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
+
 import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.NoSuchFileException;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * file utils
@@ -112,7 +123,15 @@ public class FileUtils {
         File execLocalPathFile = new File(execLocalPath);
 
         if (execLocalPathFile.exists()) {
-            org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
+            try {
+                org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
+            } catch (Exception ex) {
+                if (ex instanceof NoSuchFileException || ex.getCause() 
instanceof NoSuchFileException) {
+                    // this file is already be deleted.
+                } else {
+                    throw ex;
+                }
+            }
         }
 
         //create work dir
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 29971f94db..054df92e4f 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -265,13 +265,11 @@ public class ProcessInstance {
      */
     public ProcessInstance(ProcessDefinition processDefinition) {
         this.processDefinition = processDefinition;
-        this.name = processDefinition.getName()
-            + "-"
-            +
-            processDefinition.getVersion()
-            + "-"
-            +
-            DateUtils.getCurrentTimeStamp();
+        // todo: the name is not unique
+        this.name = String.join("-",
+                processDefinition.getName(),
+                String.valueOf(processDefinition.getVersion()),
+                DateUtils.getCurrentTimeStamp());
     }
 
     public String getVarPool() {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index c1b54df62f..d75595de1a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -190,16 +190,18 @@ public class TaskPriorityQueueConsumer extends Thread {
                     return true;
                 }
             }
-
             result = dispatcher.dispatch(executionContext);
 
             if (result) {
+                logger.info("Master success dispatch task to worker, 
taskInstanceId: {}", taskPriority.getTaskId());
                 addDispatchEvent(context, executionContext);
+            } else {
+                logger.info("Master failed to dispatch task to worker, 
taskInstanceId: {}", taskPriority.getTaskId());
             }
         } catch (RuntimeException e) {
-            logger.error("dispatch error: ", e);
+            logger.error("Master dispatch task to worker error: ", e);
         } catch (ExecuteException e) {
-            logger.error("dispatch error: {}", e.getMessage());
+            logger.error("Master dispatch task to worker error: {}", e);
         }
         return result;
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 25db4eb8a6..d439c28267 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch;
 
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 3ed41329fe..842bcaf333 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -27,10 +27,12 @@ import 
org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import java.util.Date;
 
 import io.netty.channel.Channel;
+import lombok.Data;
 
 /**
  * task event
  */
+@Data
 public class TaskEvent {
 
     /**
@@ -144,108 +146,4 @@ public class TaskEvent {
         event.setEvent(Event.WORKER_REJECT);
         return event;
     }
-
-    public String getVarPool() {
-        return varPool;
-    }
-
-    public void setVarPool(String varPool) {
-        this.varPool = varPool;
-    }
-
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public String getWorkerAddress() {
-        return workerAddress;
-    }
-
-    public void setWorkerAddress(String workerAddress) {
-        this.workerAddress = workerAddress;
-    }
-
-    public ExecutionStatus getState() {
-        return state;
-    }
-
-    public void setState(ExecutionStatus state) {
-        this.state = state;
-    }
-
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    public Date getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(Date endTime) {
-        this.endTime = endTime;
-    }
-
-    public String getExecutePath() {
-        return executePath;
-    }
-
-    public void setExecutePath(String executePath) {
-        this.executePath = executePath;
-    }
-
-    public String getLogPath() {
-        return logPath;
-    }
-
-    public void setLogPath(String logPath) {
-        this.logPath = logPath;
-    }
-
-    public int getProcessId() {
-        return processId;
-    }
-
-    public void setProcessId(int processId) {
-        this.processId = processId;
-    }
-
-    public String getAppIds() {
-        return appIds;
-    }
-
-    public void setAppIds(String appIds) {
-        this.appIds = appIds;
-    }
-
-    public Event getEvent() {
-        return event;
-    }
-
-    public void setEvent(Event event) {
-        this.event = event;
-    }
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public void setChannel(Channel channel) {
-        this.channel = channel;
-    }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public void setProcessInstanceId(int processInstanceId) {
-        this.processInstanceId = processInstanceId;
-    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index 6bf3f862ae..323ea86411 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -79,7 +79,7 @@ public class TaskExecuteThreadPool extends 
ThreadPoolTaskExecutor {
 
     public void submitTaskEvent(TaskEvent taskEvent) {
         if 
(!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
-            logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
+            logger.warn("Cannot find workflowExecuteThread from cacheManager, 
event: {}", taskEvent);
             return;
         }
         if 
(!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
@@ -114,20 +114,24 @@ public class TaskExecuteThreadPool extends 
ThreadPoolTaskExecutor {
         future.addCallback(new ListenableFutureCallback() {
             @Override
             public void onFailure(Throwable ex) {
-                logger.error("handle event {} failed: {}", 
taskExecuteThread.getProcessInstanceId(), ex);
-                if 
(!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId()))
 {
-                    
taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
-                    logger.info("remove process instance: {}", 
taskExecuteThread.getProcessInstanceId());
+                Integer processInstanceId = 
taskExecuteThread.getProcessInstanceId();
+                logger.error("persist event failed processInstanceId: {}", 
processInstanceId, ex);
+                if 
(!processInstanceExecCacheManager.contains(processInstanceId)) {
+                    taskExecuteThreadMap.remove(processInstanceId);
+                    logger.info("Cannot find processInstance from 
cacheManager, remove process instance from threadMap: {}",
+                            processInstanceId);
                 }
                 multiThreadFilterMap.remove(taskExecuteThread.getKey());
             }
 
             @Override
             public void onSuccess(Object result) {
-                logger.info("persist events {} succeeded.", 
taskExecuteThread.getProcessInstanceId());
-                if 
(!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId()))
 {
-                    
taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
-                    logger.info("remove process instance: {}", 
taskExecuteThread.getProcessInstanceId());
+                Integer processInstanceId = 
taskExecuteThread.getProcessInstanceId();
+                logger.info("persist events succeeded, processInstanceId: {}", 
processInstanceId);
+                if 
(!processInstanceExecCacheManager.contains(processInstanceId)) {
+                    taskExecuteThreadMap.remove(processInstanceId);
+                    logger.info("Cannot find processInstance from 
cacheManager, remove process instance from threadMap: {}",
+                            processInstanceId);
                 }
                 multiThreadFilterMap.remove(taskExecuteThread.getKey());
             }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 3365e67c35..ae627c44f5 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -17,10 +17,19 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import static 
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -69,8 +78,10 @@ import 
org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.corn.CronUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -88,18 +99,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
-import static 
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
 
 /**
  * Workflow execute task, used to execute a workflow instance.
@@ -1001,7 +1004,7 @@ public class WorkflowExecuteRunnable implements Runnable {
      * @param taskInstance task instance
      * @return TaskInstance
      */
-    private TaskInstance submitTaskExec(TaskInstance taskInstance) {
+    private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
         try {
             // package task instance before submit
             processService.packageTaskInstance(taskInstance, processInstance);
@@ -1019,7 +1022,7 @@ public class WorkflowExecuteRunnable implements Runnable {
                 logger.error("process id:{} name:{} submit standby task id:{} 
name:{} failed!",
                         processInstance.getId(), processInstance.getName(),
                         taskInstance.getId(), taskInstance.getName());
-                return null;
+                return Optional.empty();
             }
 
             // in a dag, only one taskInstance is valid per taskCode, so need 
to set the old taskInstance invalid
@@ -1058,10 +1061,10 @@ public class WorkflowExecuteRunnable implements 
Runnable {
                 taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE);
                 this.stateEvents.add(taskStateChangeEvent);
             }
-            return taskInstance;
+            return Optional.of(taskInstance);
         } catch (Exception e) {
             logger.error("submit standby task error", e);
-            return null;
+            return Optional.empty();
         }
     }
 
@@ -1360,6 +1363,7 @@ public class WorkflowExecuteRunnable implements Runnable {
         for (TaskInstance task : taskInstances) {
 
             if (readyToSubmitTaskQueue.contains(task)) {
+                logger.warn("Task is already at submit queue, taskInstanceId: 
{}", task.getId());
                 continue;
             }
 
@@ -1764,9 +1768,6 @@ public class WorkflowExecuteRunnable implements Runnable {
      * @param taskInstance task instance
      */
     private void removeTaskFromStandbyList(TaskInstance taskInstance) {
-        logger.info("remove task from stand by list, id: {} name:{}",
-                taskInstance.getId(),
-                taskInstance.getName());
         try {
             readyToSubmitTaskQueue.remove(taskInstance);
         } catch (Exception e) {
@@ -1859,14 +1860,14 @@ public class WorkflowExecuteRunnable implements 
Runnable {
                 }
                 DependResult dependResult = getDependResultForTask(task);
                 if (DependResult.SUCCESS == dependResult) {
-                    TaskInstance taskInstance = submitTaskExec(task);
-                    if (taskInstance == null) {
+                    Optional<TaskInstance> taskInstanceOptional = 
submitTaskExec(task);
+                    if (!taskInstanceOptional.isPresent()) {
                         this.taskFailedSubmit = true;
                         // Remove and add to complete map and error map
                         removeTaskFromStandbyList(task);
                         completeTaskMap.put(task.getTaskCode(), task.getId());
                         errorTaskMap.put(task.getTaskCode(), task.getId());
-                        logger.error("process {}, task {}, code:{} submit task 
failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode());
+                        logger.error("Task submitted failed, 
processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), 
task.getId());
                     } else {
                         removeTaskFromStandbyList(task);
                     }
@@ -1874,11 +1875,11 @@ public class WorkflowExecuteRunnable implements 
Runnable {
                     // if the dependency fails, the current node is not 
submitted and the state changes to failure.
                     dependFailedTaskMap.put(task.getTaskCode(), task.getId());
                     removeTaskFromStandbyList(task);
-                    logger.info("task {},id:{} depend result : {}", 
task.getName(), task.getId(), dependResult);
+                    logger.info("Task dependent result is failed, 
taskInstanceId:{} depend result : {}", task.getId(), dependResult);
                 } else if (DependResult.NON_EXEC == dependResult) {
                     // for some reasons(depend task pause/stop) this task 
would not be submit
                     removeTaskFromStandbyList(task);
-                    logger.info("remove task {},id:{} , because depend result 
: {}", task.getName(), task.getId(), dependResult);
+                    logger.info("Remove task due to depend result not 
executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
                 }
             }
         } catch (Exception e) {
@@ -2067,6 +2068,11 @@ public class WorkflowExecuteRunnable implements Runnable 
{
     }
 
     private void measureTaskState(StateEvent taskStateEvent) {
+        if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == 
null) {
+            // the event is broken
+            logger.warn("The task event is broken..., taskEvent: {}", 
taskStateEvent);
+            return;
+        }
         if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
             TaskMetrics.incTaskFinish();
         }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 849c3a23b3..b658f669f7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -105,7 +105,7 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
     /**
      * Handle the events belong to the given workflow.
      */
-    public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
+    public void executeEvent(final WorkflowExecuteRunnable 
workflowExecuteThread) {
         if (!workflowExecuteThread.isStart() || 
workflowExecuteThread.eventSize() == 0) {
             return;
         }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index fd803ac371..fcfe7c67bf 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -119,7 +119,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
                 logger.info("submit task, but the status of the task {} is 
already running or delayed.", taskInstance.getName());
                 return true;
             }
-            logger.debug("task ready to submit: {}", taskInstance.getName());
+            logger.info("task ready to submit: taskInstanceId: {}", 
taskInstance.getId());
 
             TaskPriority taskPriority = new 
TaskPriority(processInstance.getProcessInstancePriority().getCode(),
                     processInstance.getId(), 
taskInstance.getProcessInstancePriority().getCode(),
@@ -134,7 +134,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
             taskPriority.setTaskExecutionContext(taskExecutionContext);
 
             taskUpdateQueue.put(taskPriority);
-            logger.info("master submit success, task : {}", 
taskInstance.getName());
+            logger.info("Master submit task to priority queue success, 
taskInstanceId : {}", taskInstance.getId());
             return true;
         } catch (Exception e) {
             logger.error("submit task error", e);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 542697a92f..41c2bd56d3 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -21,8 +21,9 @@ import static 
org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
-import java.util.Objects;
 import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -36,27 +37,31 @@ public final class TaskProcessorFactory {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TaskProcessorFactory.class);
 
-    public static final Map<String, ITaskProcessor> PROCESS_MAP = new 
ConcurrentHashMap<>();
+    public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = 
new ConcurrentHashMap<>();
 
     private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
 
     static {
         for (ITaskProcessor iTaskProcessor : 
ServiceLoader.load(ITaskProcessor.class)) {
-            PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor);
+            try {
+                PROCESS_MAP.put(iTaskProcessor.getType(), 
(Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
+            } catch (NoSuchMethodException e) {
+                throw new IllegalArgumentException("The task processor should 
has a no args constructor");
+            }
         }
     }
 
-    public static ITaskProcessor getTaskProcessor(String type) throws 
InstantiationException, IllegalAccessException {
+    public static ITaskProcessor getTaskProcessor(String type) throws 
InvocationTargetException, InstantiationException, IllegalAccessException {
         if (StringUtils.isEmpty(type)) {
             type = DEFAULT_PROCESSOR;
         }
-        ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type);
-        if (Objects.isNull(iTaskProcessor)) {
-            logger.warn("task processor not found for type: {}", type);
-            return PROCESS_MAP.get(DEFAULT_PROCESSOR);
+        Constructor<ITaskProcessor> iTaskProcessorConstructor = 
PROCESS_MAP.get(type);
+        if (iTaskProcessorConstructor == null) {
+            logger.warn("ITaskProcessor could not found for taskType: {}", 
type);
+            iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
         }
 
-        return iTaskProcessor.getClass().newInstance();
+        return iTaskProcessorConstructor.newInstance();
     }
 
     /**
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index d0371809cc..b974a40b31 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner.task;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
+import java.lang.reflect.InvocationTargetException;
+
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class TaskProcessorFactoryTest {
 
     @Test
-    public void testFactory() throws InstantiationException, 
IllegalAccessException {
+    public void testFactory() throws InvocationTargetException, 
InstantiationException, IllegalAccessException {
 
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setTaskType("shell");
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index 239a3993c0..b4177ec25d 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
 
 import java.net.InetSocketAddress;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.netty.channel.Channel;
 
 /**
@@ -28,6 +31,8 @@ import io.netty.channel.Channel;
  */
 public class ChannelUtils {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ChannelUtils.class);
+
     private ChannelUtils() {
         throw new IllegalStateException(ChannelUtils.class.getName());
     }
@@ -49,7 +54,7 @@ public class ChannelUtils {
      * @return remote address
      */
     public static String getRemoteAddress(Channel channel) {
-        return NetUtils.getHost(((InetSocketAddress) 
channel.remoteAddress()).getAddress());
+        return toAddress(channel).getAddress();
     }
 
     /**
@@ -60,6 +65,11 @@ public class ChannelUtils {
      */
     public static Host toAddress(Channel channel) {
         InetSocketAddress socketAddress = ((InetSocketAddress) 
channel.remoteAddress());
+        if (socketAddress == null) {
+            // the remote channel already closed
+            LOGGER.warn("The channel is already closed");
+            return Host.EMPTY;
+        }
         return new Host(NetUtils.getHost(socketAddress.getAddress()), 
socketAddress.getPort());
     }
 
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index 2163e9c7d8..dc8e1f0d36 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -27,6 +27,8 @@ import java.util.Objects;
  */
 public class Host implements Serializable {
 
+    public static final Host EMPTY = new Host();
+
     /**
      * address
      */
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 3760bde2b6..b42e033b20 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -766,7 +766,8 @@ public class ProcessServiceImpl implements ProcessService {
         processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         processInstance.setRecovery(Flag.NO);
         processInstance.setStartTime(new Date());
-        processInstance.setRestartTime(processInstance.getStartTime());
+        // the new process instance restart time is null.
+        processInstance.setRestartTime(null);
         processInstance.setRunTimes(1);
         processInstance.setMaxTryTimes(0);
         processInstance.setCommandParam(command.getCommandParam());
@@ -1285,7 +1286,7 @@ public class ProcessServiceImpl implements ProcessService 
{
     @Override
     @Transactional(rollbackFor = Exception.class)
     public TaskInstance submitTask(ProcessInstance processInstance, 
TaskInstance taskInstance) {
-        logger.info("start submit task : {}, instance id:{}, state: {}",
+        logger.info("start submit task : {}, processInstance id:{}, state: {}",
             taskInstance.getName(), taskInstance.getProcessInstanceId(), 
processInstance.getState());
         //submit to db
         TaskInstance task = submitTaskInstanceToDB(taskInstance, 
processInstance);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index 231fd2a20f..2e939ee332 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import 
org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Task instances priority queue implementation
@@ -40,12 +42,8 @@ public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInst
     /**
      * queue
      */
-    private PriorityQueue<TaskInstance> queue = new 
PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
-
-    /**
-     * Lock used for all public operations
-     */
-    private final ReentrantLock lock = new ReentrantLock(true);
+    private final PriorityQueue<TaskInstance> queue = new 
PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+    private final Set<Integer> taskInstanceIdSet = 
Collections.synchronizedSet(new HashSet<>());
 
     /**
      * put task instance to priority queue
@@ -56,6 +54,7 @@ public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInst
     @Override
     public void put(TaskInstance taskInstance) throws 
TaskPriorityQueueException {
         queue.add(taskInstance);
+        taskInstanceIdSet.add(taskInstance.getId());
     }
 
     /**
@@ -66,7 +65,11 @@ public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInst
      */
     @Override
     public TaskInstance take() throws TaskPriorityQueueException {
-        return queue.poll();
+        TaskInstance taskInstance = queue.poll();
+        if (taskInstance != null) {
+            taskInstanceIdSet.remove(taskInstance.getId());
+        }
+        return taskInstance;
     }
 
     /**
@@ -111,6 +114,7 @@ public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInst
      */
     public void clear() {
         queue.clear();
+        taskInstanceIdSet.clear();
     }
 
     /**
@@ -120,20 +124,11 @@ public class PeerTaskInstancePriorityQueue implements 
TaskPriorityQueue<TaskInst
      * @return true is contains
      */
     public boolean contains(TaskInstance taskInstance) {
-        return this.contains(taskInstance.getTaskCode(), 
taskInstance.getTaskDefinitionVersion());
+        return this.contains(taskInstance.getId());
     }
 
-    public boolean contains(long taskCode, int taskVersion) {
-        Iterator<TaskInstance> iterator = this.queue.iterator();
-        while (iterator.hasNext()) {
-            TaskInstance taskInstance = iterator.next();
-            if (taskCode == taskInstance.getTaskCode()
-                    && taskVersion == taskInstance.getTaskDefinitionVersion()) 
{
-                return true;
-            }
-        }
-        return false;
-
+    public boolean contains(int taskInstanceId) {
+        return taskInstanceIdSet.contains(taskInstanceId);
     }
 
     /**
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
index 8da3a6c194..67e40d1189 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
@@ -46,14 +46,11 @@ public class PeerTaskInstancePriorityQueueTest {
         Assert.assertTrue(queue.size() < peekBeforeLength);
     }
 
-    @Test
+
+    @Test(expected = TaskPriorityQueueException.class)
     public void poll() throws Exception {
         PeerTaskInstancePriorityQueue queue = 
getPeerTaskInstancePriorityQueue();
-        try {
-            queue.poll(1000, TimeUnit.MILLISECONDS);
-        } catch (TaskPriorityQueueException e) {
-            e.printStackTrace();
-        }
+        queue.poll(1000, TimeUnit.MILLISECONDS);
     }
 
     @Test
diff --git 
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml 
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index d968090e46..945cae26bb 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -109,7 +109,7 @@ master:
   # master prepare execute thread number to limit handle commands in parallel
   pre-exec-threads: 10
   # master execute thread number to limit process instances in parallel
-  exec-threads: 100
+  exec-threads: 10
   # master dispatch task number per batch
   dispatch-task-number: 3
   # master host selector to select a suitable worker, default value: 
LowerWeight. Optional values include random, round_robin, lower_weight
@@ -134,7 +134,7 @@ worker:
   # worker listener port
   listen-port: 1234
   # worker execute thread number to limit task instances in parallel
-  exec-threads: 100
+  exec-threads: 10
   # worker heartbeat interval, the unit is second
   heartbeat-interval: 10
   # worker host weight to dispatch tasks, default value 100
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 94a3ffe43a..441deb495b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -34,6 +34,7 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.SystemUtils;
 
 import java.io.File;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -147,7 +148,11 @@ public class ShellTask extends AbstractTaskExecutor {
             if (!file.getParentFile().exists()) {
                 file.getParentFile().mkdirs();
             }
-            Files.createFile(path, attr);
+            try {
+                Files.createFile(path, attr);
+            } catch (FileAlreadyExistsException ex) {
+                // this is expected
+            }
         }
 
         Files.write(path, shellParameters.getRawScript().getBytes(), 
StandardOpenOption.APPEND);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 3eb112bb07..059aa64ce6 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -21,9 +21,9 @@ import java.util.Set;
 
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import 
org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.stereotype.Component;
+import org.springframework.context.annotation.Configuration;
 
-@Component
+@Configuration
 @EnableConfigurationProperties
 @ConfigurationProperties("worker")
 public class WorkerConfig {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 448f62c8c2..8264ea52f0 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -163,6 +163,8 @@ public class TaskCallbackService {
                     }
                 }
             });
+        } else {
+            logger.warn("Remote channel of taskInstanceId is null: {}, cannot 
send command: {}", taskInstanceId, command);
         }
     }
 
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 48c590cbd7..df75caa3bb 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
 
 import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import 
org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -39,17 +42,26 @@ import 
org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-import com.google.common.base.Strings;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
+
 /**
  * task scheduler thread
  */
@@ -232,7 +244,11 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
                 org.apache.commons.io.FileUtils.deleteDirectory(new 
File(execLocalPath));
                 logger.info("exec local path: {} cleared.", execLocalPath);
             } catch (IOException e) {
-                logger.error("delete exec dir failed : {}", e.getMessage(), e);
+                if (e instanceof NoSuchFileException) {
+                    // this is expected
+                } else {
+                    logger.error("Delete exec dir failed.", e);
+                }
             }
         }
     }
@@ -263,7 +279,7 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
                 task.cancelApplication(true);
                 ProcessUtils.killYarnJob(taskExecutionContext);
             } catch (Exception e) {
-                logger.error(e.getMessage(), e);
+                logger.error("Kill task failed", e);
             }
         }
     }

Reply via email to