This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new f112415  TaskManager refactor (#2302)
f112415 is described below

commit f112415b13d7fdd18cf668c2cc639e79cdf08458
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue Mar 24 18:54:45 2020 +0800

    TaskManager refactor (#2302)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * Encapsulate the parameters required by sqltask
    
    * 1,Encapsulate the parameters required by sqltask
    2,SQLTask optimization
    
    * AbstractTask modify
    
    * ProcedureTask optimization
    
    * MasterSchedulerService modify
    
    * TaskUpdateQueueConsumer modify
    
    * test
    
    * DataxTask process run debug
    
    * DataxTask process run debug
    
    * add protobuf dependency,MR、Spark task etc need this
    
    * TaskUpdateQueueConsumer modify
    
    * TaskExecutionContextBuilder set TaskInstance workgroup
    
    * WorkerGroupService queryAllGroup modify
    query available work group
    
    * 1,get workergroup from zk modify
    2,SpringConnectionFactory repeat load modify
    
    * master and worker register ip  use OSUtils.getHost()
    
    * ProcessInstance host set ip:port format
    
    * worker fault tolerance modify
    
    * Constants and .env modify
    
    * master fault tolerant bug modify
    
    * UT add pom.xml
    
    * timing online  modify
    
    * when taskResponse is faster than taskAck to db,task state will error
    add async queue and new a thread reslove this problem
    
    * TaskExecutionContext set host
    
    * 1,TaskManager refactor
    2, api start load server dolphinschedule-daemon.sh modify
    
    * 1,TaskManager refactor
    2, api start load server dolphinschedule-daemon.sh modify
    
    * add UT in pom.xml
    
    * revert dolphinscheduler-daemon.sh
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../master/consumer/TaskUpdateQueueConsumer.java   |  8 ++-
 .../server/master/manager/TaskEvent.java           | 36 ++++++--------
 .../server/master/manager/TaskEventEnum.java       | 58 ++++++++++++++++++++++
 .../server/master/manager/TaskManager.java         | 44 +++++++++-------
 .../server/master/processor/TaskAckProcessor.java  |  6 +--
 .../master/processor/TaskResponseProcessor.java    |  6 +--
 pom.xml                                            |  4 ++
 7 files changed, 109 insertions(+), 53 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index 5f66c1b..e7b6327 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.server.master.consumer;
 
 import com.alibaba.fastjson.JSONObject;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UdfType;
@@ -89,12 +88,11 @@ public class TaskUpdateQueueConsumer extends Thread{
     public void run() {
         while (Stopper.isRunning()){
             try {
-                if (taskUpdateQueue.size() == 0){
-                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-                    continue;
-                }
+                // if not task , blocking here
                 String taskPriorityInfo = taskUpdateQueue.take();
+
                 TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
+
                 dispatch(taskPriority.getTaskId());
             }catch (Exception e){
                 logger.error("dispatcher task error",e);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
index 5c6740f..98b8d62 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
@@ -26,9 +26,6 @@ import java.util.Date;
  */
 public class TaskEvent {
 
-    public static final String ACK = "ack";
-    public static final String RESPONSE = "response";
-
     /**
      * taskInstanceId
      */
@@ -77,7 +74,7 @@ public class TaskEvent {
     /**
      * ack / response
      */
-    private String type;
+    private TaskEventEnum type;
 
 
     /**
@@ -88,22 +85,21 @@ public class TaskEvent {
      * @param executePath executePath
      * @param logPath logPath
      * @param taskInstanceId taskInstanceId
-     * @param type type
      */
-    public void receiveAck(ExecutionStatus state,
-                           Date startTime,
-                           String workerAddress,
-                           String executePath,
-                           String logPath,
-                           int taskInstanceId,
-                           String type){
+    public TaskEvent(ExecutionStatus state,
+                     Date startTime,
+                     String workerAddress,
+                     String executePath,
+                     String logPath,
+                     int taskInstanceId){
         this.state = state;
         this.startTime = startTime;
         this.workerAddress = workerAddress;
         this.executePath = executePath;
         this.logPath = logPath;
         this.taskInstanceId = taskInstanceId;
-        this.type = type;
+        this.type = TaskEventEnum.ACK;
+
     }
 
     /**
@@ -113,20 +109,18 @@ public class TaskEvent {
      * @param processId processId
      * @param appIds appIds
      * @param taskInstanceId taskInstanceId
-     * @param type type
      */
-    public void receiveResponse(ExecutionStatus state,
+    public TaskEvent(ExecutionStatus state,
                                 Date endTime,
                                 int processId,
                                 String appIds,
-                                int taskInstanceId,
-                                String type){
+                                int taskInstanceId){
         this.state = state;
         this.endTime = endTime;
         this.processId = processId;
         this.appIds = appIds;
         this.taskInstanceId = taskInstanceId;
-        this.type = type;
+        this.type = TaskEventEnum.RESPONSE;
     }
 
     public int getTaskInstanceId() {
@@ -201,11 +195,11 @@ public class TaskEvent {
         this.appIds = appIds;
     }
 
-    public String getType() {
+    public TaskEventEnum getType() {
         return type;
     }
 
-    public void setType(String type) {
+    public void setType(TaskEventEnum type) {
         this.type = type;
     }
 
@@ -221,7 +215,7 @@ public class TaskEvent {
                 ", logPath='" + logPath + '\'' +
                 ", processId=" + processId +
                 ", appIds='" + appIds + '\'' +
-                ", type='" + type + '\'' +
+                ", type=" + type +
                 '}';
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
new file mode 100644
index 0000000..f3d7497
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.manager;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+
+import java.util.Date;
+
+/**
+ * task event enum
+ */
+public enum TaskEventEnum {
+
+    ACK(0, "task ack"),
+    RESPONSE(1, "task response result");
+
+    TaskEventEnum(int code, String descp){
+        this.code = code;
+        this.descp = descp;
+    }
+
+    @EnumValue
+    private final int code;
+    private final String descp;
+
+    public String getDescp() {
+        return descp;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public static TaskEventEnum of(int status){
+        for(TaskEventEnum es : values()){
+            if(es.getCode() == status){
+                return es;
+            }
+        }
+        throw new IllegalArgumentException("invalid status : " + status);
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
index a2710ee..ade0aea 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.manager;
 
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
@@ -28,6 +27,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import static 
org.apache.dolphinscheduler.server.master.manager.TaskEventEnum.*;
 
 /**
  * task manager
@@ -56,6 +56,7 @@ public class TaskManager {
     @PostConstruct
     public void init(){
         TaskWorker taskWorker = new TaskWorker();
+        taskWorker.setName("TaskWorkerThread");
         taskWorker.start();
     }
 
@@ -83,12 +84,8 @@ public class TaskManager {
 
             while (Stopper.isRunning()){
                 try {
-                    if (attemptQueue.size() == 0){
-                        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-                        continue;
-                    }
+                    // if not task , blocking here
                     TaskEvent taskEvent = attemptQueue.take();
-
                     persist(taskEvent);
 
                 }catch (Exception e){
@@ -102,19 +99,28 @@ public class TaskManager {
          * @param taskEvent taskEvent
          */
         private void persist(TaskEvent taskEvent){
-            if (TaskEvent.ACK.equals(taskEvent.getType())){
-                processService.changeTaskState(taskEvent.getState(),
-                        taskEvent.getStartTime(),
-                        taskEvent.getWorkerAddress(),
-                        taskEvent.getExecutePath(),
-                        taskEvent.getLogPath(),
-                        taskEvent.getTaskInstanceId());
-            }else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){
-                processService.changeTaskState(taskEvent.getState(),
-                        taskEvent.getEndTime(),
-                        taskEvent.getProcessId(),
-                        taskEvent.getAppIds(),
-                        taskEvent.getTaskInstanceId());
+            // task event type
+            TaskEventEnum type = taskEvent.getType();
+
+            switch (type){
+                case ACK:
+                    processService.changeTaskState(taskEvent.getState(),
+                            taskEvent.getStartTime(),
+                            taskEvent.getWorkerAddress(),
+                            taskEvent.getExecutePath(),
+                            taskEvent.getLogPath(),
+                            taskEvent.getTaskInstanceId());
+                    break;
+                case RESPONSE:
+                    processService.changeTaskState(taskEvent.getState(),
+                            taskEvent.getEndTime(),
+                            taskEvent.getProcessId(),
+                            taskEvent.getAppIds(),
+                            taskEvent.getTaskInstanceId());
+                    break;
+                default:
+                    throw new IllegalArgumentException("invalid task event 
type : " + type);
+
             }
         }
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index a678cad..67832a1 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -73,14 +73,12 @@ public class TaskAckProcessor implements 
NettyRequestProcessor {
         String workerAddress = ChannelUtils.toAddress(channel).getAddress();
 
         // TaskEvent
-        TaskEvent taskEvent = new TaskEvent();
-        taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()),
+        TaskEvent taskEvent = new 
TaskEvent(ExecutionStatus.of(taskAckCommand.getStatus()),
                 taskAckCommand.getStartTime(),
                 workerAddress,
                 taskAckCommand.getExecutePath(),
                 taskAckCommand.getLogPath(),
-                taskAckCommand.getTaskInstanceId(),
-                TaskEvent.ACK);
+                taskAckCommand.getTaskInstanceId());
 
         taskManager.putTask(taskEvent);
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index ffc5d72..9cf10b9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -73,13 +73,11 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
         taskInstanceCacheManager.cacheTaskInstance(responseCommand);
 
         // TaskEvent
-        TaskEvent taskEvent = new TaskEvent();
-        
taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()),
+        TaskEvent taskEvent = new 
TaskEvent(ExecutionStatus.of(responseCommand.getStatus()),
                 responseCommand.getEndTime(),
                 responseCommand.getProcessId(),
                 responseCommand.getAppIds(),
-                responseCommand.getTaskInstanceId(),
-                TaskEvent.RESPONSE);
+                responseCommand.getTaskInstanceId());
 
         taskManager.putTask(taskEvent);
     }
diff --git a/pom.xml b/pom.xml
index e2b0e66..e7b95d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -732,6 +732,8 @@
                         
<include>**/server/log/WorkerLogFilterTest.java</include>
                         
<include>**/server/master/executor/NettyExecutorManagerTest.java</include>
                         
<include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
+                        
<include>**/server/master/host/RandomSelectorTest.java</include>
+                        
<include>**/server/master/host/RoundRobinSelectorTest.java</include>
                         
<include>**/server/master/register/MasterRegistryTest.java</include>
                         
<include>**/server/master/AlertManagerTest.java</include>
                         
<include>**/server/master/MasterCommandTest.java</include>
@@ -743,6 +745,7 @@
                         <include>**/server/utils/ParamUtilsTest.java</include>
                         
<include>**/server/utils/ProcessUtilsTest.java</include>
                         
<include>**/server/utils/SparkArgsUtilsTest.java</include>
+                        
<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
                         
<include>**/server/worker/register/WorkerRegistryTest.java</include>
                         
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
                         
<include>**/server/worker/sql/SqlExecutorTest.java</include>
@@ -750,6 +753,7 @@
                         
<include>**/server/worker/task/dependent/DependentTaskTest.java</include>
                         
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
                         
<include>**/server/worker/task/EnvFileTest.java</include>
+
                     </includes>
                     <!-- <skip>true</skip> -->
                     <argLine>-Xmx2048m</argLine>

Reply via email to