This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 3b923e5933bf3f97d73849feef89e535635e4366
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jul 4 22:08:15 2022 +0800

    [Fix-10666] Workflow submit failed will still in memory and never retry 
(#10667)
    
    * Workflow submit failed will still in memory and never retry
    
    (cherry picked from commit 35a10d092f566c07137da5fb67b21cee644cdc8f)
---
 .../server/master/exception/MasterException.java   |  29 +++
 .../master/metrics/ProcessInstanceMetrics.java     |   6 +
 .../master/runner/MasterSchedulerService.java      | 159 ++++++++++------
 .../master/runner/StateWheelExecuteThread.java     |  18 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 199 ++++++++++-----------
 .../server/master/runner/WorkflowSubmitStatue.java |  34 ++++
 .../WorkflowExecuteRunnableTest.java}              |  19 +-
 .../service/process/ProcessService.java            |   2 +-
 8 files changed, 296 insertions(+), 170 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java
new file mode 100644
index 0000000000..eb5d5e8dfe
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.exception;
+
+public class MasterException extends Exception {
+
+    public MasterException(String message) {
+        super(message);
+    }
+
+    public MasterException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
index 957a3ecb7b..2845d2a637 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
@@ -90,6 +90,12 @@ public final class ProcessInstanceMetrics {
             .register(Metrics.globalRegistry);
     }
 
+    public static synchronized void 
registerProcessInstanceResubmitGauge(Supplier<Number> function) {
+        Gauge.builder("ds.workflow.instance.resubmit", function)
+            .description("The current process instance need to resubmit count")
+            .register(Metrics.globalRegistry);
+    }
+
     public static void incProcessInstanceSubmit() {
         PROCESS_INSTANCE_SUBMIT_COUNTER.increment();
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 1d0071a4dd..ae7b4bdeb0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -27,11 +27,10 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.exception.MasterException;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@@ -43,7 +42,9 @@ import org.apache.commons.collections4.CollectionUtils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.slf4j.Logger;
@@ -51,6 +52,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import lombok.NonNull;
+
 /**
  * Master scheduler thread, this thread will consume the commands from 
database and trigger processInstance executed.
  */
@@ -71,8 +74,6 @@ public class MasterSchedulerService extends BaseDaemonThread {
     @Autowired
     private ProcessAlertManager processAlertManager;
 
-    private NettyRemotingClient nettyRemotingClient;
-
     @Autowired
     private NettyExecutorManager nettyExecutorManager;
 
@@ -93,6 +94,10 @@ public class MasterSchedulerService extends BaseDaemonThread 
{
     @Autowired
     private StateWheelExecuteThread stateWheelExecuteThread;
 
+    private final LinkedBlockingQueue<ProcessInstance> 
submitFailedProcessInstances = new LinkedBlockingQueue<>();
+
+    private Thread failedProcessInstanceResubmitThread;
+
     private String masterAddress;
 
     protected MasterSchedulerService() {
@@ -104,22 +109,23 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
      */
     public void init() {
         this.masterPrepareExecService = (ThreadPoolExecutor) 
ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", 
masterConfig.getPreExecThreads());
-        NettyClientConfig clientConfig = new NettyClientConfig();
-        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
+        this.failedProcessInstanceResubmitThread = new 
FailedProcessInstanceResubmitThread(submitFailedProcessInstances);
+        
ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size);
     }
 
     @Override
     public synchronized void start() {
         logger.info("Master schedule service starting..");
-        this.stateWheelExecuteThread.start();
         super.start();
+        this.failedProcessInstanceResubmitThread.start();
         logger.info("Master schedule service started...");
     }
 
     public void close() {
         logger.info("Master schedule service stopping...");
-        nettyRemotingClient.close();
+        // these process instances will be failover, so we can safa clear here
+        submitFailedProcessInstances.clear();
         logger.info("Master schedule service stopped...");
     }
 
@@ -142,7 +148,9 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
                 Thread.currentThread().interrupt();
                 break;
             } catch (Exception e) {
-                logger.error("Master schedule service loop command error", e);
+                logger.error("Master schedule workflow error", e);
+                // sleep for 1s here to avoid the database down cause the 
exception boom
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             }
         }
     }
@@ -150,7 +158,7 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
     /**
      * Query command from database by slot, and transform to workflow 
instance, then submit to workflowExecuteThreadPool.
      */
-    private void scheduleWorkflow() throws InterruptedException {
+    private void scheduleWorkflow() throws InterruptedException, 
MasterException {
         List<Command> commands = findCommands();
         if (CollectionUtils.isEmpty(commands)) {
             // indicate that no command ,sleep for 1s
@@ -160,37 +168,52 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
 
         List<ProcessInstance> processInstances = 
command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
+            // indicate that the command transform to processInstance error, 
sleep for 1s
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-            try {
-                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-                logger.info("Master schedule service starting workflow 
instance");
-                final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
-                    processInstance
-                    , processService
-                    , nettyExecutorManager
-                    , processAlertManager
-                    , masterConfig
-                    , stateWheelExecuteThread);
-
-                
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
-                if (processInstance.getTimeout() > 0) {
-                    
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-                }
-                ProcessInstanceMetrics.incProcessInstanceSubmit();
-                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
-                logger.info("Master schedule service started workflow 
instance");
-
-            } catch (Exception ex) {
-                
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                logger.info("Master submit workflow to thread pool failed, 
will remove workflow runnable from cache manager", ex);
-            } finally {
-                LoggerUtils.removeWorkflowInstanceIdMDC();
+            submitProcessInstance(processInstance);
+        }
+    }
+
+    private void submitProcessInstance(@NonNull ProcessInstance 
processInstance) {
+        try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+            logger.info("Master schedule service starting workflow instance");
+            final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
+                processInstance
+                , processService
+                , nettyExecutorManager
+                , processAlertManager
+                , masterConfig
+                , stateWheelExecuteThread);
+
+            
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
+            if (processInstance.getTimeout() > 0) {
+                
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
+            ProcessInstanceMetrics.incProcessInstanceSubmit();
+            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = 
CompletableFuture.supplyAsync(
+                workflowExecuteRunnable::call, workflowExecuteThreadPool);
+            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
+                    // submit failed
+                    
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+                    
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+                    submitFailedProcessInstances.add(processInstance);
+                }
+            });
+            logger.info("Master schedule service started workflow instance");
+
+        } catch (Exception ex) {
+            
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+            
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+            logger.info("Master submit workflow to thread pool failed, will 
remove workflow runnable from cache manager", ex);
+        } finally {
+            LoggerUtils.removeWorkflowInstanceIdMDC();
         }
     }
 
@@ -232,23 +255,27 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
         return processInstances;
     }
 
-    private List<Command> findCommands() {
-        long scheduleStartTime = System.currentTimeMillis();
-        int thisMasterSlot = ServerNodeManager.getSlot();
-        int masterCount = ServerNodeManager.getMasterSize();
-        if (masterCount <= 0) {
-            logger.warn("Master count: {} is invalid, the current slot: {}", 
masterCount, thisMasterSlot);
-            return Collections.emptyList();
-        }
-        int pageNumber = 0;
-        int pageSize = masterConfig.getFetchCommandNum();
-        final List<Command> result = 
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, 
thisMasterSlot);
-        if (CollectionUtils.isNotEmpty(result)) {
-            logger.info("Master schedule service loop command success, command 
size: {}, current slot: {}, total slot size: {}",
-                result.size(), thisMasterSlot, masterCount);
+    private List<Command> findCommands() throws MasterException {
+        try {
+            long scheduleStartTime = System.currentTimeMillis();
+            int thisMasterSlot = ServerNodeManager.getSlot();
+            int masterCount = ServerNodeManager.getMasterSize();
+            if (masterCount <= 0) {
+                logger.warn("Master count: {} is invalid, the current slot: 
{}", masterCount, thisMasterSlot);
+                return Collections.emptyList();
+            }
+            int pageNumber = 0;
+            int pageSize = masterConfig.getFetchCommandNum();
+            final List<Command> result = 
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, 
thisMasterSlot);
+            if (CollectionUtils.isNotEmpty(result)) {
+                logger.info("Master schedule service loop command success, 
command size: {}, current slot: {}, total slot size: {}",
+                    result.size(), thisMasterSlot, masterCount);
+            }
+            
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - 
scheduleStartTime);
+            return result;
+        } catch (Exception ex) {
+            throw new MasterException("Master loop command from database 
error", ex);
         }
-        
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - 
scheduleStartTime);
-        return result;
     }
 
     private SlotCheckState slotCheck(Command command) {
@@ -265,4 +292,34 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
         return state;
     }
 
+    private class FailedProcessInstanceResubmitThread extends Thread {
+
+        private final LinkedBlockingQueue<ProcessInstance> 
submitFailedProcessInstances;
+
+        public 
FailedProcessInstanceResubmitThread(LinkedBlockingQueue<ProcessInstance> 
submitFailedProcessInstances) {
+            logger.info("Starting workflow resubmit thread");
+            this.submitFailedProcessInstances = submitFailedProcessInstances;
+            this.setDaemon(true);
+            this.setName("SubmitFailedProcessInstanceHandleThread");
+            logger.info("Started workflow resubmit thread");
+        }
+
+        @Override
+        public void run() {
+            while (Stopper.isRunning()) {
+                try {
+                    ProcessInstance processInstance = 
submitFailedProcessInstances.take();
+                    submitProcessInstance(processInstance);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    logger.warn("SubmitFailedProcessInstanceHandleThread has 
been interrupted, will return");
+                    break;
+                }
+
+                // avoid the failed-fast cause CPU higher
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+            }
+        }
+    }
+
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 8b92696723..a56a7d8c5a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -33,12 +33,11 @@ import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 
-import org.apache.commons.lang3.ThreadUtils;
-
-import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import javax.annotation.PostConstruct;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -91,9 +90,14 @@ public class StateWheelExecuteThread extends 
BaseDaemonThread {
         super("StateWheelExecuteThread");
     }
 
+    @PostConstruct
+    public void startWheelThread() {
+        super.start();
+    }
+
     @Override
     public void run() {
-        Duration checkInterval = masterConfig.getStateWheelInterval();
+        final long checkInterval = 
masterConfig.getStateWheelInterval().toMillis();
         while (Stopper.isRunning()) {
             try {
                 checkTask4Timeout();
@@ -104,9 +108,11 @@ public class StateWheelExecuteThread extends 
BaseDaemonThread {
                 logger.error("state wheel thread check error:", e);
             }
             try {
-                ThreadUtils.sleep(checkInterval);
+                Thread.sleep(checkInterval);
             } catch (InterruptedException e) {
-                logger.error("state wheel thread sleep error", e);
+                logger.error("state wheel thread sleep error, will close the 
loop", e);
+                Thread.currentThread().interrupt();
+                break;
             }
         }
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7676367a18..88d7391dde 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -81,12 +81,14 @@ import 
org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -96,30 +98,29 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
+import lombok.NonNull;
+
 /**
  * Workflow execute task, used to execute a workflow instance.
  */
-public class WorkflowExecuteRunnable implements Runnable {
+public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> 
{
 
     /**
      * logger of WorkflowExecuteThread
      */
     private static final Logger logger = 
LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
 
-    /**
-     * master config
-     */
-    private final MasterConfig masterConfig;
-
     /**
      * process service
      */
@@ -151,14 +152,14 @@ public class WorkflowExecuteRunnable implements Runnable {
     private DAG<String, TaskNode, TaskNodeRelation> dag;
 
     /**
-     * key of workflow
+     * unique key of workflow
      */
     private String key;
 
     /**
      * start flag, true: start nodes submit completely
      */
-    private boolean isStart = false;
+    private volatile boolean isStart = false;
 
     /**
      * submit failure nodes
@@ -235,6 +236,8 @@ public class WorkflowExecuteRunnable implements Runnable {
      */
     private final StateWheelExecuteThread stateWheelExecuteThread;
 
+    private final String masterAddress;
+
     /**
      * @param processInstance         processInstance
      * @param processService          processService
@@ -243,19 +246,19 @@ public class WorkflowExecuteRunnable implements Runnable {
      * @param masterConfig            masterConfig
      * @param stateWheelExecuteThread stateWheelExecuteThread
      */
-    public WorkflowExecuteRunnable(ProcessInstance processInstance
-            , ProcessService processService
-            , NettyExecutorManager nettyExecutorManager
-            , ProcessAlertManager processAlertManager
-            , MasterConfig masterConfig
-            , StateWheelExecuteThread stateWheelExecuteThread) {
+    public WorkflowExecuteRunnable(@NonNull ProcessInstance processInstance,
+                                   @NonNull ProcessService processService,
+                                   @NonNull NettyExecutorManager 
nettyExecutorManager,
+                                   @NonNull ProcessAlertManager 
processAlertManager,
+                                   @NonNull MasterConfig masterConfig,
+                                   @NonNull StateWheelExecuteThread 
stateWheelExecuteThread) {
         this.processService = processService;
         this.processInstance = processInstance;
-        this.masterConfig = masterConfig;
         this.nettyExecutorManager = nettyExecutorManager;
         this.processAlertManager = processAlertManager;
         this.stateWheelExecuteThread = stateWheelExecuteThread;
-        TaskMetrics.registerTaskRunning(readyToSubmitTaskQueue::size);
+        this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
+        TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
     }
 
     /**
@@ -280,6 +283,7 @@ public class WorkflowExecuteRunnable implements Runnable {
                     this.stateEvents.remove(stateEvent);
                 }
             } catch (Exception e) {
+                // we catch the exception here, since if the state event 
handle failed, the state event will still keep in the stateEvents queue.
                 logger.error("state handle error:", e);
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@@ -464,6 +468,7 @@ public class WorkflowExecuteRunnable implements Runnable {
 
         if (taskInstance.getState().typeIsSuccess()) {
             completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+            // todo: merge the last taskInstance
             processInstance.setVarPool(taskInstance.getVarPool());
             processService.saveProcessInstance(processInstance);
             if (!processInstance.isBlocked()) {
@@ -822,18 +827,24 @@ public class WorkflowExecuteRunnable implements Runnable {
      * ProcessInstance start entrypoint.
      */
     @Override
-    public void run() {
+    public WorkflowSubmitStatue call() {
         if (this.taskInstanceMap.size() > 0 || isStart) {
             logger.warn("The workflow has already been started");
-            return;
+            return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
         }
+
         try {
+            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
             buildFlowDag();
             initTaskQueue();
             submitPostNode(null);
             isStart = true;
+            return WorkflowSubmitStatue.SUCCESS;
         } catch (Exception e) {
-            logger.error("start process error, process instance id:{}", 
processInstance.getId(), e);
+            logger.error("Start workflow error", e);
+            return WorkflowSubmitStatue.FAILED;
+        } finally {
+            LoggerUtils.removeWorkflowInstanceIdMDC();
         }
     }
 
@@ -883,7 +894,7 @@ public class WorkflowExecuteRunnable implements Runnable {
     }
 
     /**
-     * generate process dag
+     * Generate process dag
      *
      * @throws Exception exception
      */
@@ -895,7 +906,7 @@ public class WorkflowExecuteRunnable implements Runnable {
                                                                  
processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
 
-        List<TaskInstance> recoverNodeList = 
getStartTaskInstanceList(processInstance.getCommandParam());
+        List<TaskInstance> recoverNodeList = 
getRecoverTaskInstanceList(processInstance.getCommandParam());
 
         List<ProcessTaskRelation> processTaskRelations = 
processService.findRelationByCode(processDefinition.getCode(), 
processDefinition.getVersion());
         List<TaskDefinitionLog> taskDefinitionLogs = 
processService.getTaskDefineLogListByRelation(processTaskRelations);
@@ -990,7 +1001,8 @@ public class WorkflowExecuteRunnable implements Runnable {
                 if (complementListDate.isEmpty() && needComplementProcess()) {
                     complementListDate = CronUtils.getSelfFireDateList(start, 
end, schedules);
                     logger.info(" process definition code:{} complement data: 
{}",
-                        processInstance.getProcessDefinitionCode(), 
complementListDate.toString());
+                                processInstance.getProcessDefinitionCode(),
+                                complementListDate.toString());
 
                     if (!complementListDate.isEmpty() && Flag.NO == 
processInstance.getIsSubProcess()) {
                         
processInstance.setScheduleTime(complementListDate.get(0));
@@ -1082,7 +1094,7 @@ public class WorkflowExecuteRunnable implements Runnable {
 
         try {
             HostUpdateCommand hostUpdateCommand = new HostUpdateCommand();
-            
hostUpdateCommand.setProcessHost(NetUtils.getAddr(masterConfig.getListenPort()));
+            hostUpdateCommand.setProcessHost(masterAddress);
             hostUpdateCommand.setTaskInstanceId(taskInstance.getId());
             Host host = new Host(taskInstance.getHost());
             nettyExecutorManager.doExecute(host, 
hostUpdateCommand.convert2Command());
@@ -1843,105 +1855,84 @@ public class WorkflowExecuteRunnable implements 
Runnable {
      * handling the list of tasks to be submitted
      */
     private void submitStandByTask() {
-        try {
-            int length = readyToSubmitTaskQueue.size();
-            for (int i = 0; i < length; i++) {
-                TaskInstance task = readyToSubmitTaskQueue.peek();
-                if (task == null) {
+        int length = readyToSubmitTaskQueue.size();
+        for (int i = 0; i < length; i++) {
+            TaskInstance task = readyToSubmitTaskQueue.peek();
+            if (task == null) {
+                continue;
+            }
+            // stop tasks which is retrying if forced success happens
+            if (task.taskCanRetry()) {
+                TaskInstance retryTask = 
processService.findTaskInstanceById(task.getId());
+                if (retryTask != null && 
retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
+                    task.setState(retryTask.getState());
+                    logger.info("task: {} has been forced success, put it into 
complete task list and stop retrying",
+                                task.getName());
+                    removeTaskFromStandbyList(task);
+                    completeTaskMap.put(task.getTaskCode(), task.getId());
+                    taskInstanceMap.put(task.getId(), task);
+                    submitPostNode(Long.toString(task.getTaskCode()));
                     continue;
                 }
-                // stop tasks which is retrying if forced success happens
-                if (task.taskCanRetry()) {
-                    TaskInstance retryTask = 
processService.findTaskInstanceById(task.getId());
-                    if (retryTask != null && 
retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
-                        task.setState(retryTask.getState());
-                        logger.info("task: {} has been forced success, put it 
into complete task list and stop retrying", task.getName());
-                        removeTaskFromStandbyList(task);
-                        completeTaskMap.put(task.getTaskCode(), task.getId());
-                        taskInstanceMap.put(task.getId(), task);
-                        submitPostNode(Long.toString(task.getTaskCode()));
-                        continue;
-                    }
-                }
-                //init varPool only this task is the first time running
-                if (task.isFirstRun()) {
-                    //get pre task ,get all the task varPool to this task
-                    Set<String> preTask = 
dag.getPreviousNodes(Long.toString(task.getTaskCode()));
-                    getPreVarPool(task, preTask);
-                }
-                DependResult dependResult = getDependResultForTask(task);
-                if (DependResult.SUCCESS == dependResult) {
-                    Optional<TaskInstance> taskInstanceOptional = 
submitTaskExec(task);
-                    if (!taskInstanceOptional.isPresent()) {
-                        this.taskFailedSubmit = true;
-                        // Remove and add to complete map and error map
-                        removeTaskFromStandbyList(task);
-                        completeTaskMap.put(task.getTaskCode(), task.getId());
-                        errorTaskMap.put(task.getTaskCode(), task.getId());
-                        logger.error("Task submitted failed, 
processInstanceId: {}, taskInstanceId: {}",
-                                     task.getProcessInstanceId(),
-                                     task.getId());
-                    } else {
-                        removeTaskFromStandbyList(task);
-                    }
-                } else if (DependResult.FAILED == dependResult) {
-                    // if the dependency fails, the current node is not 
submitted and the state changes to failure.
-                    dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+            }
+            //init varPool only this task is the first time running
+            if (task.isFirstRun()) {
+                //get pre task ,get all the task varPool to this task
+                Set<String> preTask = 
dag.getPreviousNodes(Long.toString(task.getTaskCode()));
+                getPreVarPool(task, preTask);
+            }
+            DependResult dependResult = getDependResultForTask(task);
+            if (DependResult.SUCCESS == dependResult) {
+                Optional<TaskInstance> taskInstanceOptional = 
submitTaskExec(task);
+                if (!taskInstanceOptional.isPresent()) {
+                    this.taskFailedSubmit = true;
+                    // Remove and add to complete map and error map
                     removeTaskFromStandbyList(task);
-                    logger.info("Task dependent result is failed, 
taskInstanceId:{} depend result : {}",
-                                task.getId(),
-                                dependResult);
-                } else if (DependResult.NON_EXEC == dependResult) {
-                    // for some reasons(depend task pause/stop) this task 
would not be submit
+                    completeTaskMap.put(task.getTaskCode(), task.getId());
+                    errorTaskMap.put(task.getTaskCode(), task.getId());
+                    logger.error("Task submitted failed, processInstanceId: 
{}, taskInstanceId: {}",
+                                 task.getProcessInstanceId(),
+                                 task.getId());
+                } else {
                     removeTaskFromStandbyList(task);
-                    logger.info("Remove task due to depend result not 
executed, taskInstanceId:{} depend result : {}",
-                                task.getId(),
-                                dependResult);
                 }
+            } else if (DependResult.FAILED == dependResult) {
+                // if the dependency fails, the current node is not submitted 
and the state changes to failure.
+                dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+                removeTaskFromStandbyList(task);
+                logger.info("Task dependent result is failed, 
taskInstanceId:{} depend result : {}",
+                            task.getId(),
+                            dependResult);
+            } else if (DependResult.NON_EXEC == dependResult) {
+                // for some reasons(depend task pause/stop) this task would 
not be submit
+                removeTaskFromStandbyList(task);
+                logger.info("Remove task due to depend result not executed, 
taskInstanceId:{} depend result : {}",
+                            task.getId(),
+                            dependResult);
             }
-        } catch (Exception e) {
-            logger.error("submit standby task error", e);
         }
     }
 
     /**
-     * get recovery task instance list
-     *
-     * @param taskIdArray task id array
-     * @return recovery task instance list
-     */
-    private List<TaskInstance> getRecoverTaskInstanceList(String[] 
taskIdArray) {
-        if (taskIdArray == null || taskIdArray.length == 0) {
-            return new ArrayList<>();
-        }
-        List<Integer> taskIdList = new ArrayList<>(taskIdArray.length);
-        for (String taskId : taskIdArray) {
-            try {
-                Integer id = Integer.valueOf(taskId);
-                taskIdList.add(id);
-            } catch (Exception e) {
-                logger.error("get recovery task instance failed ", e);
-            }
-        }
-        return processService.findTaskInstanceByIdList(taskIdList);
-    }
-
-    /**
-     * get start task instance list
+     * Get start task instance list from recover
      *
      * @param cmdParam command param
      * @return task instance list
      */
-    private List<TaskInstance> getStartTaskInstanceList(String cmdParam) {
-
-        List<TaskInstance> instanceList = new ArrayList<>();
+    protected List<TaskInstance> getRecoverTaskInstanceList(String cmdParam) {
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
 
+        // todo: Can we use a better way to set the recover taskInstanceId 
list? rather then use the cmdParam
         if (paramMap != null && 
paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
-            String[] idList = 
paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(COMMA);
-            instanceList = getRecoverTaskInstanceList(idList);
+            String[] idList = 
paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
+            if (ArrayUtils.isNotEmpty(idList)) {
+                List<Integer> taskInstanceIds = Arrays.stream(idList)
+                    .map(Integer::valueOf)
+                    .collect(Collectors.toList());
+                return 
processService.findTaskInstanceByIdList(taskInstanceIds);
+            }
         }
-        return instanceList;
+        return Collections.emptyList();
     }
 
     /**
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java
new file mode 100644
index 0000000000..b53a500c89
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+public enum WorkflowSubmitStatue {
+    /**
+     * Submit success
+     */
+    SUCCESS,
+    /**
+     * Submit failed, this status should be retry
+     */
+    FAILED,
+    /**
+     * Duplicated submitted, this status should never occur.
+     */
+    DUPLICATED_SUBMITTED,
+    ;
+}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
similarity index 93%
rename from 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
rename to 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index f2d263f699..bc3051ae14 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master;
+package org.apache.dolphinscheduler.server.master.runner;
 
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+
 import static org.powermock.api.mockito.PowerMockito.mock;
 
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -35,9 +36,8 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import 
org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
-import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -70,7 +70,7 @@ import org.springframework.context.ApplicationContext;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({WorkflowExecuteRunnable.class})
-public class WorkflowExecuteTaskTest {
+public class WorkflowExecuteRunnableTest {
 
     private WorkflowExecuteRunnable workflowExecuteThread;
 
@@ -113,7 +113,10 @@ public class WorkflowExecuteTaskTest {
         
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
 
         stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
-        workflowExecuteThread = PowerMockito.spy(new 
WorkflowExecuteRunnable(processInstance, processService, null, null, config, 
stateWheelExecuteThread));
+        NettyExecutorManager nettyExecutorManager = 
mock(NettyExecutorManager.class);
+        ProcessAlertManager processAlertManager = 
mock(ProcessAlertManager.class);
+        workflowExecuteThread =
+            PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, 
processService, nettyExecutorManager, processAlertManager, config, 
stateWheelExecuteThread));
         // prepareProcess init dag
         Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
@@ -154,9 +157,9 @@ public class WorkflowExecuteTaskTest {
                     Arrays.asList(taskInstance1.getId(), 
taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
             ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, 
taskInstance3, taskInstance4));
             Class<WorkflowExecuteRunnable> masterExecThreadClass = 
WorkflowExecuteRunnable.class;
-            Method method = 
masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", 
String.class);
+            Method method = 
masterExecThreadClass.getDeclaredMethod("getRecoverTaskInstanceList", 
String.class);
             method.setAccessible(true);
-            List<TaskInstance> taskInstances = (List<TaskInstance>) 
method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
+            List<TaskInstance> taskInstances = 
workflowExecuteThread.getRecoverTaskInstanceList(JSONUtils.toJsonString(cmdParam));
             Assert.assertEquals(4, taskInstances.size());
 
             cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1");
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 588fb59422..4bfb0b4809 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -81,7 +81,7 @@ public interface ProcessService {
 
     ProcessDefinition findProcessDefineById(int processDefinitionId);
 
-    ProcessDefinition findProcessDefinition(Long processDefinitionCode, int 
version);
+    ProcessDefinition findProcessDefinition(Long processDefinitionCode, int 
processDefinitionVersion);
 
     ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode);
 

Reply via email to