This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 052f9d1  UT Coverage rate test (#2276)
052f9d1 is described below

commit 052f9d10bf9507ab68c32ca03441a8e6bf3db991
Author: qiaozhanwei <[email protected]>
AuthorDate: Mon Mar 23 14:44:17 2020 +0800

    UT Coverage rate test (#2276)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * Encapsulate the parameters required by sqltask
    
    * 1,Encapsulate the parameters required by sqltask
    2,SQLTask optimization
    
    * AbstractTask modify
    
    * ProcedureTask optimization
    
    * MasterSchedulerService modify
    
    * TaskUpdateQueueConsumer modify
    
    * test
    
    * DataxTask process run debug
    
    * DataxTask process run debug
    
    * add protobuf dependency,MR、Spark task etc need this
    
    * TaskUpdateQueueConsumer modify
    
    * TaskExecutionContextBuilder set TaskInstance workgroup
    
    * WorkerGroupService queryAllGroup modify
    query available work group
    
    * 1,get workergroup from zk modify
    2,SpringConnectionFactory repeat load modify
    
    * master and worker register ip  use OSUtils.getHost()
    
    * ProcessInstance host set ip:port format
    
    * worker fault tolerance modify
    
    * Constants and .env modify
    
    * master fault tolerant bug modify
    
    * UT add pom.xml
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../apache/dolphinscheduler/common/Constants.java  |  2 +-
 .../server/master/runner/MasterExecThread.java     | 37 ++++++++++++++++++++--
 pom.xml                                            | 36 +++++++++++++++++----
 3 files changed, 65 insertions(+), 10 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 2aded0f..1636072 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -121,7 +121,7 @@ public final class Constants {
     /**
      * MasterServer directory registered in zookeeper
      */
-    public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = 
"/nodes/masters";
+    public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = 
"/nodes/master";
 
     /**
      * WorkerServer directory registered in zookeeper
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index df1eac3..db7cdbb 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -725,7 +725,7 @@ public class MasterExecThread implements Runnable {
         ProcessInstance instance = 
processService.findProcessInstanceById(processInstance.getId());
         ExecutionStatus state = instance.getState();
 
-        if(activeTaskNode.size() > 0){
+        if(activeTaskNode.size() > 0 || haveRetryTaskStandBy()){
             return runningState(state);
         }
         // process failure
@@ -769,6 +769,24 @@ public class MasterExecThread implements Runnable {
     }
 
     /**
+     * whether standby task list have retry tasks
+     * @return
+     */
+    private boolean haveRetryTaskStandBy() {
+
+        boolean result = false;
+
+        for(String taskName : readyToSubmitTaskList.keySet()){
+            TaskInstance task = readyToSubmitTaskList.get(taskName);
+            if(task.getState().typeIsFailure()){
+                result = true;
+                break;
+            }
+        }
+        return result;
+    }
+
+    /**
      * whether complement end
      * @return Boolean whether is complement end
      */
@@ -856,7 +874,11 @@ public class MasterExecThread implements Runnable {
         // submit start node
         submitPostNode(null);
         boolean sendTimeWarning = false;
-        while(!processInstance.IsProcessInstanceStop()){
+        while(Stopper.isRunning()){
+
+            if(processInstance.IsProcessInstanceStop()){
+                break;
+            }
 
             // send warning email if process time out.
             if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){
@@ -871,12 +893,21 @@ public class MasterExecThread implements Runnable {
                 if(!future.isDone()){
                     continue;
                 }
+
                 // node monitor thread complete
-                activeTaskNode.remove(entry.getKey());
+                task = this.processService.findTaskInstanceById(task.getId());
+
                 if(task == null){
                     this.taskFailedSubmit = true;
+                    activeTaskNode.remove(entry.getKey());
                     continue;
                 }
+
+                // node monitor thread complete
+                if(task.getState().typeIsFinished()){
+                    activeTaskNode.remove(entry.getKey());
+                }
+
                 logger.info("task :{}, id:{} complete, state is {} ",
                         task.getName(), task.getId(), 
task.getState().toString());
                 //TODO  node success , post node submit
diff --git a/pom.xml b/pom.xml
index b17573d..e2b0e66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -712,20 +712,44 @@
                         <include>**/alert/utils/FuncUtilsTest.java</include>
                         <include>**/alert/utils/JSONUtilsTest.java</include>
                         
<include>**/alert/utils/PropertyUtilsTest.java</include>
-                        
<include>**/server/utils/SparkArgsUtilsTest.java</include>
-                        
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
-                        <include>**/server/utils/ParamUtilsTest.java</include>
-                        
<include>**/server/master/MasterExecThreadTest.java</include>
+                        
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
+                        
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
                         
<include>**/dao/mapper/AccessTokenMapperTest.java</include>
                         
<include>**/dao/mapper/AlertGroupMapperTest.java</include>
                         <include>**/dao/mapper/AlertMapperTest.java</include>
                         <include>**/dao/mapper/CommandMapperTest.java</include>
                         <include>**/dao/cron/CronUtilsTest.java</include>
                         <include>**/dao/utils/DagHelperTest.java</include>
-                        
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
-                        
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
                         
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
+                        
<include>**/server/utils/SparkArgsUtilsTest.java</include>
+                        
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
+                        <include>**/server/utils/ParamUtilsTest.java</include>
+                        
<include>**/server/log/MasterLogFilterTest.java</include>
+                        
<include>**/server/log/SensitiveDataConverterTest.java</include>
+                        
<include>**/server/log/TaskLogDiscriminatorTest.java</include>
+                        <include>**/server/log/TaskLogFilterTest.java</include>
+                        
<include>**/server/log/WorkerLogFilterTest.java</include>
+                        
<include>**/server/master/executor/NettyExecutorManagerTest.java</include>
+                        
<include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
+                        
<include>**/server/master/register/MasterRegistryTest.java</include>
+                        
<include>**/server/master/AlertManagerTest.java</include>
+                        
<include>**/server/master/MasterCommandTest.java</include>
+                        
<include>**/server/master/MasterExecThreadTest.java</include>
+                        <include>**/server/master/ParamsTest.java</include>
+                        
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
+                        <include>**/server/utils/DataxUtilsTest.java</include>
+                        
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
+                        <include>**/server/utils/ParamUtilsTest.java</include>
+                        
<include>**/server/utils/ProcessUtilsTest.java</include>
+                        
<include>**/server/utils/SparkArgsUtilsTest.java</include>
+                        
<include>**/server/worker/register/WorkerRegistryTest.java</include>
+                        
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
+                        
<include>**/server/worker/sql/SqlExecutorTest.java</include>
+                        
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
+                        
<include>**/server/worker/task/dependent/DependentTaskTest.java</include>
+                        
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
+                        
<include>**/server/worker/task/EnvFileTest.java</include>
                     </includes>
                     <!-- <skip>true</skip> -->
                     <argLine>-Xmx2048m</argLine>

Reply via email to