This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new eb45ff9  TaskPriority refactor (#2097)
eb45ff9 is described below

commit eb45ff9e83534e569352511c2a7bf03ac06f5106
Author: qiaozhanwei <[email protected]>
AuthorDate: Fri Mar 6 12:02:14 2020 +0800

    TaskPriority refactor (#2097)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../api/controller/ProcessInstanceController.java  |  7 +--
 .../api/service/DataAnalysisService.java           |  7 +--
 .../api/service/ProcessInstanceService.java        | 52 +---------------------
 .../api/service/DataAnalysisServiceTest.java       | 28 +-----------
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  1 +
 .../server/entity/TaskPriority.java                |  3 +-
 .../server/master/MasterServer.java                |  5 ++-
 .../master/runner/MasterBaseTaskExecThread.java    |  5 +--
 8 files changed, 15 insertions(+), 93 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index 80db6c8..b6533ad 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -26,7 +26,6 @@ import 
org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 import io.swagger.annotations.*;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -240,8 +239,7 @@ public class ProcessInstanceController extends 
BaseController{
             logger.info("delete process instance by id, login user:{}, project 
name:{}, process instance id:{}",
                     loginUser.getUserName(), projectName, processInstanceId);
             // task queue
-            ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
-            Map<String, Object> result = 
processInstanceService.deleteProcessInstanceById(loginUser, projectName, 
processInstanceId,tasksQueue);
+            Map<String, Object> result = 
processInstanceService.deleteProcessInstanceById(loginUser, projectName, 
processInstanceId);
             return returnDataList(result);
         }catch (Exception e){
             logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e);
@@ -370,7 +368,6 @@ public class ProcessInstanceController extends 
BaseController{
             logger.info("delete process instance by ids, login user:{}, 
project name:{}, process instance ids :{}",
                     loginUser.getUserName(), projectName, processInstanceIds);
             // task queue
-            ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
             Map<String, Object> result = new HashMap<>(5);
             List<String> deleteFailedIdList = new ArrayList<>();
             if(StringUtils.isNotEmpty(processInstanceIds)){
@@ -379,7 +376,7 @@ public class ProcessInstanceController extends 
BaseController{
                 for (String strProcessInstanceId:processInstanceIdArray) {
                     int processInstanceId = 
Integer.parseInt(strProcessInstanceId);
                     try {
-                        Map<String, Object> deleteResult = 
processInstanceService.deleteProcessInstanceById(loginUser, projectName, 
processInstanceId,tasksQueue);
+                        Map<String, Object> deleteResult = 
processInstanceService.deleteProcessInstanceById(loginUser, projectName, 
processInstanceId);
                         
if(!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))){
                             deleteFailedIdList.add(strProcessInstanceId);
                             
logger.error((String)deleteResult.get(Constants.MSG));
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
index bafe833..0c93e00 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
@@ -29,8 +29,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -318,9 +316,8 @@ public class DataAnalysisService extends BaseService{
             return result;
         }
 
-        ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
-        List<String> tasksQueueList = 
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        List<String> tasksKillList = 
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL);
+        List<String> tasksQueueList = new ArrayList<>();
+        List<String> tasksKillList = new ArrayList<>();
 
         Map<String,Integer> dataMap = new HashMap<>();
         if (loginUser.getUserType() == UserType.ADMIN_USER){
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 2b1f04e..4b809a8 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -38,7 +38,6 @@ import 
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -404,8 +403,6 @@ public class ProcessInstanceService extends BaseDAGService {
             processInstance.setProcessInstanceJson(processInstanceJson);
             processInstance.setGlobalParams(globalParams);
         }
-//        int update = processDao.updateProcessInstance(processInstanceId, 
processInstanceJson,
-//                globalParams, schedule, flag, locations, connects);
         int update = processService.updateProcessInstance(processInstance);
         int updateDefine = 1;
         if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
@@ -472,11 +469,10 @@ public class ProcessInstanceService extends 
BaseDAGService {
      * @param loginUser login user
      * @param projectName project name
      * @param processInstanceId process instance id
-     * @param tasksQueue task queue
      * @return delete result code
      */
     @Transactional(rollbackFor = Exception.class)
-    public Map<String, Object> deleteProcessInstanceById(User loginUser, 
String projectName, Integer processInstanceId, ITaskQueue tasksQueue) {
+    public Map<String, Object> deleteProcessInstanceById(User loginUser, 
String projectName, Integer processInstanceId) {
 
         Map<String, Object> result = new HashMap<>(5);
         Project project = projectMapper.queryByName(projectName);
@@ -494,52 +490,6 @@ public class ProcessInstanceService extends BaseDAGService 
{
             return result;
         }
 
-        //process instance priority
-        int processInstancePriority = 
processInstance.getProcessInstancePriority().ordinal();
-        // delete zk queue
-        if (CollectionUtils.isNotEmpty(taskInstanceList)){
-            for (TaskInstance taskInstance : taskInstanceList){
-                // task instance priority
-                int taskInstancePriority = 
taskInstance.getTaskInstancePriority().ordinal();
-
-                StringBuilder nodeValueSb = new StringBuilder(100);
-                nodeValueSb.append(processInstancePriority)
-                        .append(UNDERLINE)
-                        .append(processInstanceId)
-                        .append(UNDERLINE)
-                        .append(taskInstancePriority)
-                        .append(UNDERLINE)
-                        .append(taskInstance.getId())
-                        .append(UNDERLINE);
-
-                int taskWorkerGroupId = 
processService.getTaskWorkerGroupId(taskInstance);
-                WorkerGroup workerGroup = 
workerGroupMapper.selectById(taskWorkerGroupId);
-
-                if(workerGroup == null){
-                    nodeValueSb.append(DEFAULT_WORKER_ID);
-                }else {
-
-                    String ips = workerGroup.getIpList();
-                    StringBuilder ipSb = new StringBuilder(100);
-                    String[] ipArray = ips.split(COMMA);
-
-                    for (String ip : ipArray) {
-                        long ipLong = IpUtils.ipToLong(ip);
-                        ipSb.append(ipLong).append(COMMA);
-                    }
-
-                    if(ipSb.length() > 0) {
-                        ipSb.deleteCharAt(ipSb.length() - 1);
-                    }
-                    nodeValueSb.append(ipSb);
-                }
-
-                logger.info("delete task queue node : 
{}",nodeValueSb.toString());
-                
tasksQueue.removeNode(org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE,
 nodeValueSb.toString());
-
-            }
-        }
-
         // delete database cascade
         int delete = 
processService.deleteWorkProcessInstanceById(processInstanceId);
         processService.deleteAllSubWorkProcessByParentId(processInstanceId);
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
index 6f308e7..10220e2 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.junit.After;
 import org.junit.Assert;
@@ -74,8 +73,7 @@ public class DataAnalysisServiceTest {
     @Mock
     TaskInstanceMapper taskInstanceMapper;
 
-    @Mock
-    ITaskQueue taskQueue;
+
 
     @Mock
     ProcessService processService;
@@ -183,30 +181,6 @@ public class DataAnalysisServiceTest {
 
     }
 
-    @Test
-    public void testCountQueueState(){
-
-        PowerMockito.mockStatic(TaskQueueFactory.class);
-        List<String>  taskQueueList = new ArrayList<>(1);
-        taskQueueList.add("1_0_1_1_-1");
-        List<String>  taskKillList = new ArrayList<>(1);
-        taskKillList.add("1-0");
-        
PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE)).thenReturn(taskQueueList);
-        
PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL)).thenReturn(taskKillList);
-        
PowerMockito.when(TaskQueueFactory.getTaskQueueInstance()).thenReturn(taskQueue);
-        //checkProject false
-        Map<String, Object> result = 
dataAnalysisService.countQueueState(user,2);
-        Assert.assertTrue(result.isEmpty());
-
-        result = dataAnalysisService.countQueueState(user,1);
-        Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
-        //admin
-        user.setUserType(UserType.ADMIN_USER);
-        result = dataAnalysisService.countQueueState(user,1);
-        Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
-
-    }
-
     /**
      *  get list
      * @return
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index dc463fe..3fc40ca 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -194,6 +194,7 @@ public class TaskInstance implements Serializable {
     /**
      * workerGroup
      */
+    @TableField(exist = false)
     private String workerGroup;
 
     public ProcessInstance getProcessInstance() {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
index 7db5f45..991eeed 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
@@ -132,7 +132,8 @@ public class TaskPriority {
      */
     public static TaskPriority of(String taskPriorityInfo){
         String[] parts = taskPriorityInfo.split(UNDERLINE);
-        if (parts.length != 4) {
+
+        if (parts.length != 5) {
             throw new IllegalArgumentException(String.format("TaskPriority : 
%s illegal.", taskPriorityInfo));
         }
         TaskPriority taskPriority = new TaskPriority(
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6923cd0..12fe25b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import 
org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
@@ -37,6 +38,7 @@ import 
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -166,7 +168,8 @@ public class MasterServer implements IStoppable {
             logger.error("start Quartz failed", e);
         }
 
-
+        TaskUpdateQueueConsumer taskUpdateQueueConsumer = 
SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class);
+        taskUpdateQueueConsumer.start();
         /**
          *  register hooks, which are called before the process exits
          */
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index c8b7b0e..b0fd632 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
         this.cancel = false;
         this.taskInstance = taskInstance;
         this.masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
-        this.taskUpdateQueue = new TaskUpdateQueueImpl();
+        this.taskUpdateQueue = 
SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
     }
 
     /**
@@ -180,8 +180,7 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
                     processInstance.getId(),
                     taskInstance.getProcessInstancePriority().getCode(),
                     taskInstance.getId(),
-                    taskInstance.getWorkerGroup());
-
+                    
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
             taskUpdateQueue.put(taskPriorityInfo);
             logger.info(String.format("master submit success, task : %s", 
taskInstance.getName()) );
             return true;

Reply via email to