This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 8aaee39  task prioriry refator (#2094)
8aaee39 is described below

commit 8aaee39aa28e9d61eac25aee181fd48e790b929c
Author: qiaozhanwei <[email protected]>
AuthorDate: Thu Mar 5 18:25:48 2020 +0800

    task prioriry refator (#2094)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
---
 .../apache/dolphinscheduler/common/Constants.java  |   2 +
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  14 +
 .../server/entity/TaskPriority.java                | 146 ++++++++
 .../master/consumer/TaskUpdateQueueConsumer.java   | 169 ++++++++++
 .../server/master/processor/TaskAckProcessor.java  |   5 +-
 .../master/runner/MasterBaseTaskExecThread.java    | 152 +++------
 .../server/master/runner/MasterTaskExecThread.java |   5 +-
 .../server/worker/WorkerServer.java                |   8 +-
 .../service/process/ProcessService.java            |  42 +--
 .../dolphinscheduler/service/queue/ITaskQueue.java | 102 ------
 .../service/queue/TaskQueueFactory.java            |   4 +-
 .../service/queue/TaskQueueZkImpl.java             | 375 ---------------------
 .../service/queue/TaskUpdateQueue.java}            |  50 ++-
 .../service/queue/TaskUpdateQueueImpl.java         | 115 +++++++
 .../src/test/java/queue/TaskQueueZKImplTest.java   | 229 -------------
 .../src/test/java/queue/TaskUpdateQueueTest.java   |  59 ++++
 16 files changed, 588 insertions(+), 889 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 2aff56e..67ce5fd 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -1004,4 +1004,6 @@ public final class Constants {
      * default worker group
      */
     public static final String DEFAULT_WORKER_GROUP = "default";
+
+    public static final Integer TASK_INFO_LENGTH = 5;
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index e444ad2..dc463fe 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -191,6 +191,11 @@ public class TaskInstance implements Serializable {
      */
     private int workerGroupId;
 
+    /**
+     * workerGroup
+     */
+    private String workerGroup;
+
     public ProcessInstance getProcessInstance() {
         return processInstance;
     }
@@ -460,6 +465,14 @@ public class TaskInstance implements Serializable {
         this.dependentResult = dependentResult;
     }
 
+    public String getWorkerGroup() {
+        return workerGroup;
+    }
+
+    public void setWorkerGroup(String workerGroup) {
+        this.workerGroup = workerGroup;
+    }
+
     @Override
     public String toString() {
         return "TaskInstance{" +
@@ -492,6 +505,7 @@ public class TaskInstance implements Serializable {
                 ", processInstancePriority=" + processInstancePriority +
                 ", dependentResult='" + dependentResult + '\'' +
                 ", workerGroupId=" + workerGroupId +
+                ", workerGroup='" + workerGroup + '\'' +
                 '}';
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
new file mode 100644
index 0000000..7db5f45
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.entity;
+
+import static org.apache.dolphinscheduler.common.Constants.*;
+
+/**
+ *  task priority info
+ */
+public class TaskPriority {
+
+    /**
+     * processInstancePriority
+     */
+    private int processInstancePriority;
+
+    /**
+     * processInstanceId
+     */
+    private int processInstanceId;
+
+    /**
+     * taskInstancePriority
+     */
+    private int taskInstancePriority;
+
+    /**
+     * taskId
+     */
+    private int taskId;
+
+    /**
+     * groupName
+     */
+    private String groupName;
+
+    /**
+     *   
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
+     */
+    private String taskPriorityInfo;
+
+    public TaskPriority(){}
+
+    public TaskPriority(int processInstancePriority,
+                        int processInstanceId,
+                        int taskInstancePriority,
+                        int taskId, String groupName) {
+        this.processInstancePriority = processInstancePriority;
+        this.processInstanceId = processInstanceId;
+        this.taskInstancePriority = taskInstancePriority;
+        this.taskId = taskId;
+        this.groupName = groupName;
+        this.taskPriorityInfo = this.processInstancePriority +
+                UNDERLINE +
+                this.processInstanceId +
+                UNDERLINE +
+                this.taskInstancePriority +
+                UNDERLINE +
+                this.taskId +
+                UNDERLINE +
+                this.groupName;
+    }
+
+    public int getProcessInstancePriority() {
+        return processInstancePriority;
+    }
+
+    public void setProcessInstancePriority(int processInstancePriority) {
+        this.processInstancePriority = processInstancePriority;
+    }
+
+    public int getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public void setProcessInstanceId(int processInstanceId) {
+        this.processInstanceId = processInstanceId;
+    }
+
+    public int getTaskInstancePriority() {
+        return taskInstancePriority;
+    }
+
+    public void setTaskInstancePriority(int taskInstancePriority) {
+        this.taskInstancePriority = taskInstancePriority;
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+    public String getTaskPriorityInfo() {
+        return taskPriorityInfo;
+    }
+
+    public void setTaskPriorityInfo(String taskPriorityInfo) {
+        this.taskPriorityInfo = taskPriorityInfo;
+    }
+
+    /**
+     * taskPriorityInfo convert taskPriority
+     *
+     * @param taskPriorityInfo taskPriorityInfo
+     * @return TaskPriority
+     */
+    public static TaskPriority of(String taskPriorityInfo){
+        String[] parts = taskPriorityInfo.split(UNDERLINE);
+        if (parts.length != 4) {
+            throw new IllegalArgumentException(String.format("TaskPriority : 
%s illegal.", taskPriorityInfo));
+        }
+        TaskPriority taskPriority = new TaskPriority(
+                Integer.parseInt(parts[0]),
+                Integer.parseInt(parts[1]),
+                Integer.parseInt(parts[2]),
+                Integer.parseInt(parts[3]),
+                parts[4]);
+        return taskPriority;
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
new file mode 100644
index 0000000..cccc700
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.consumer;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskPriority;
+import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * TaskUpdateQueue consumer
+ */
+@Component
+public class TaskUpdateQueueConsumer extends Thread{
+
+    /**
+     * logger of TaskUpdateQueueConsumer
+     */
+    private static final Logger logger = 
LoggerFactory.getLogger(TaskUpdateQueueConsumer.class);
+
+    /**
+     * taskUpdateQueue
+     */
+    @Autowired
+    private TaskUpdateQueue taskUpdateQueue;
+
+    /**
+     * processService
+     */
+    @Autowired
+    private ProcessService processService;
+
+    /**
+     * executor dispatcher
+     */
+    @Autowired
+    private ExecutorDispatcher dispatcher;
+
+    @Override
+    public void run() {
+        while (Stopper.isRunning()){
+            try {
+                if (taskUpdateQueue.size() == 0){
+                    continue;
+                }
+                String taskPriorityInfo = taskUpdateQueue.take();
+                TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
+                dispatch(taskPriority.getTaskId());
+            }catch (Exception e){
+                logger.error("dispatcher task error",e);
+            }
+        }
+    }
+
+
+    /**
+     * TODO dispatch task
+     *
+     * @param taskInstanceId taskInstanceId
+     * @return result
+     */
+    private Boolean dispatch(int taskInstanceId){
+        TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
+        ExecutionContext executionContext = new 
ExecutionContext(context.toCommand(), ExecutorType.WORKER, 
context.getWorkerGroup());
+        try {
+            return dispatcher.dispatch(executionContext);
+        } catch (ExecuteException e) {
+            logger.error("execute exception", e);
+            return false;
+        }
+
+    }
+
+    /**
+     * get TaskExecutionContext
+     * @param taskInstanceId taskInstanceId
+     * @return TaskExecutionContext
+     */
+    protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId){
+        TaskInstance taskInstance = 
processService.getTaskInstanceDetailByTaskId(taskInstanceId);
+
+        Integer userId = taskInstance.getProcessDefine() == null ? 0 : 
taskInstance.getProcessDefine().getUserId();
+        Tenant tenant = 
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
 userId);
+
+        // verify tenant is null
+        if (verifyTenantIsNull(tenant, taskInstance)) {
+            processService.changeTaskState(ExecutionStatus.FAILURE,
+                    taskInstance.getStartTime(),
+                    taskInstance.getHost(),
+                    null,
+                    null,
+                    taskInstance.getId());
+            return null;
+        }
+        // set queue for process instance, user-specified queue takes 
precedence over tenant queue
+        String userQueue = 
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+        
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? 
tenant.getQueue() : userQueue);
+        
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+        taskInstance.setExecutePath(getExecLocalPath(taskInstance));
+
+        return TaskExecutionContextBuilder.get()
+                .buildTaskInstanceRelatedInfo(taskInstance)
+                
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
+                
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+                .create();
+    }
+
+    /**
+     * get execute local path
+     *
+     * @return execute local path
+     */
+    private String getExecLocalPath(TaskInstance taskInstance){
+        return 
FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+                taskInstance.getProcessDefine().getId(),
+                taskInstance.getProcessInstance().getId(),
+                taskInstance.getId());
+    }
+
+
+    /**
+     *  whehter tenant is null
+     * @param tenant tenant
+     * @param taskInstance taskInstance
+     * @return result
+     */
+    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance 
taskInstance) {
+        if(tenant == null){
+            logger.error("tenant not exists,process instance id : {},task 
instance id : {}",
+                    taskInstance.getProcessInstance().getId(),
+                    taskInstance.getId());
+            return true;
+        }
+        return false;
+    }
+
+
+
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index ef2cb67..8f0b731 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import 
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import 
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
@@ -66,12 +67,14 @@ public class TaskAckProcessor implements 
NettyRequestProcessor {
         logger.info("taskAckCommand : {}", taskAckCommand);
 
         taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
+
+        String workerAddress = ChannelUtils.toAddress(channel).getAddress();
         /**
          * change Task state
          */
         
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
                 taskAckCommand.getStartTime(),
-                taskAckCommand.getHost(),
+                workerAddress,
                 taskAckCommand.getExecutePath(),
                 taskAckCommand.getLogPath(),
                 taskAckCommand.getTaskInstanceId());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 71bb8f8..c8b7b0e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -17,26 +17,18 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
-import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.apache.dolphinscheduler.common.Constants.*;
 
 import java.util.concurrent.Callable;
 
@@ -72,11 +64,6 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
     protected TaskInstance taskInstance;
 
     /**
-     * task queue
-     */
-    protected ITaskQueue taskQueue;
-
-    /**
      * whether need cancel
      */
     protected boolean cancel;
@@ -86,12 +73,10 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
      */
     private MasterConfig masterConfig;
 
-
     /**
-     * executor dispatcher
+     * taskUpdateQueue
      */
-    private ExecutorDispatcher dispatcher;
-
+    private TaskUpdateQueue taskUpdateQueue;
     /**
      * constructor of MasterBaseTaskExecThread
      * @param taskInstance      task instance
@@ -101,11 +86,10 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
         this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
         this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
         this.processInstance = processInstance;
-        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
         this.cancel = false;
         this.taskInstance = taskInstance;
         this.masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
-        this.dispatcher = 
SpringApplicationContext.getBean(ExecutorDispatcher.class);
+        this.taskUpdateQueue = new TaskUpdateQueueImpl();
     }
 
     /**
@@ -124,87 +108,6 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
     }
 
     /**
-     * TODO 分发任务
-     * dispatch task to worker
-     * @param taskInstance
-     */
-    private Boolean dispatch(TaskInstance taskInstance){
-        TaskExecutionContext context = getTaskExecutionContext(taskInstance);
-        ExecutionContext executionContext = new 
ExecutionContext(context.toCommand(), ExecutorType.WORKER, 
context.getWorkerGroup());
-        try {
-            return dispatcher.dispatch(executionContext);
-        } catch (ExecuteException e) {
-            logger.error("execute exception", e);
-            return false;
-        }
-
-    }
-
-    /**
-     * get TaskExecutionContext
-     *
-     * @param taskInstance taskInstance
-     * @return TaskExecutionContext
-     */
-    protected TaskExecutionContext getTaskExecutionContext(TaskInstance 
taskInstance){
-        taskInstance = 
processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
-
-        Integer userId = taskInstance.getProcessDefine() == null ? 0 : 
taskInstance.getProcessDefine().getUserId();
-        Tenant tenant = 
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
 userId);
-
-        // verify tenant is null
-        if (verifyTenantIsNull(tenant, taskInstance)) {
-            processService.changeTaskState(ExecutionStatus.FAILURE,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    null,
-                    null,
-                    taskInstance.getId());
-            return null;
-        }
-        // set queue for process instance, user-specified queue takes 
precedence over tenant queue
-        String userQueue = 
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
-        
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? 
tenant.getQueue() : userQueue);
-        
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
-        taskInstance.setExecutePath(getExecLocalPath(taskInstance));
-
-        return TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
-                
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
-                .create();
-    }
-
-
-    /**
-     * get execute local path
-     *
-     * @return execute local path
-     */
-    private String getExecLocalPath(TaskInstance taskInstance){
-        return 
FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
-                taskInstance.getProcessDefine().getId(),
-                taskInstance.getProcessInstance().getId(),
-                taskInstance.getId());
-    }
-
-
-    /**
-     *  whehter tenant is null
-     * @param tenant tenant
-     * @param taskInstance taskInstance
-     * @return result
-     */
-    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance 
taskInstance) {
-        if(tenant == null){
-            logger.error("tenant not exists,process instance id : {},task 
instance id : {}",
-                    taskInstance.getProcessInstance().getId(),
-                    taskInstance.getId());
-            return true;
-        }
-        return false;
-    }
-    /**
      * submit master base task exec thread
      * @return TaskInstance
      */
@@ -268,10 +171,20 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
                 logger.info(String.format("submit to task, but task [%s] state 
already be running. ", taskInstance.getName()));
                 return true;
             }
-            logger.info("task ready to submit: {}" , taskInstance);
-            boolean submitTask = dispatch(taskInstance);
+            logger.info("task ready to submit: {}", taskInstance);
+
+            /**
+             *  taskPriorityInfo
+             */
+            String taskPriorityInfo = 
buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(),
+                    processInstance.getId(),
+                    taskInstance.getProcessInstancePriority().getCode(),
+                    taskInstance.getId(),
+                    taskInstance.getWorkerGroup());
+
+            taskUpdateQueue.put(taskPriorityInfo);
             logger.info(String.format("master submit success, task : %s", 
taskInstance.getName()) );
-            return submitTask;
+            return true;
         }catch (Exception e){
             logger.error("submit task  Exception: ", e);
             logger.error("task error : %s", JSONUtils.toJson(taskInstance));
@@ -279,6 +192,33 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
         }
     }
 
+
+    /**
+     *  buildTaskPriorityInfo
+     *
+     * @param processInstancePriority processInstancePriority
+     * @param processInstanceId processInstanceId
+     * @param taskInstancePriority taskInstancePriority
+     * @param taskInstanceId taskInstanceId
+     * @param workerGroup workerGroup
+     * @return TaskPriorityInfo
+     */
+    private String buildTaskPriorityInfo(int processInstancePriority,
+                                         int processInstanceId,
+                                         int taskInstancePriority,
+                                         int taskInstanceId,
+                                         String workerGroup){
+        return processInstancePriority +
+                UNDERLINE +
+                processInstanceId +
+                UNDERLINE +
+                taskInstancePriority +
+                UNDERLINE +
+                taskInstanceId +
+                UNDERLINE +
+                workerGroup;
+    }
+
     /**
      * submit wait complete
      * @return true
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index fb3f8e9..1197dc2 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -182,7 +182,10 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
         }
         alreadyKilled = true;
 
-        TaskExecutionContext taskExecutionContext = 
super.getTaskExecutionContext(taskInstance);
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskInstanceId(taskInstance.getId());
+        taskExecutionContext.setProcessId(taskInstance.getPid());
+
         ExecutionContext executionContext = new 
ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, 
taskExecutionContext.getWorkerGroup());
 
         Host host = Host.of(taskInstance.getHost());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index cfb94b5..fb35f47 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -31,7 +31,6 @@ import 
org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,10 +60,7 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private ZKWorkerClient zkWorkerClient = null;
 
-    /**
-     * task queue impl
-     */
-    protected ITaskQueue taskQueue;
+
 
     /**
      *  fetch task executor service
@@ -136,7 +132,7 @@ public class WorkerServer implements IStoppable {
 
         this.zkWorkerClient.init();
 
-        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
+
 
         this.fetchTaskExecutorService = 
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 55cd634..ca0ed79 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,11 +98,6 @@ public class ProcessService {
     private  ProjectMapper projectMapper;
 
     /**
-     * task queue impl
-     */
-    @Autowired
-    private ITaskQueue taskQueue;
-    /**
      * handle Command (construct ProcessInstance from Command) , wrapped in 
transaction
      * @param logger logger
      * @param host host
@@ -960,40 +954,7 @@ public class ProcessService {
         return taskInstance;
     }
 
-    /**
-     * submit task to queue
-     * @param taskInstance taskInstance
-     * @return whether submit task to queue success
-     */
-    public Boolean submitTaskToQueue(TaskInstance taskInstance) {
 
-        try{
-            if(taskInstance.isSubProcess()){
-                return true;
-            }
-            if(taskInstance.getState().typeIsFinished()){
-                logger.info(String.format("submit to task queue, but task [%s] 
state [%s] is already  finished. ", taskInstance.getName(), 
taskInstance.getState().toString()));
-                return true;
-            }
-            // task cannot submit when running
-            if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
-                logger.info(String.format("submit to task queue, but task [%s] 
state already be running. ", taskInstance.getName()));
-                return true;
-            }
-            if(checkTaskExistsInTaskQueue(taskInstance)){
-                logger.info(String.format("submit to task queue, but task [%s] 
already exists in the queue.", taskInstance.getName()));
-                return true;
-            }
-            logger.info("task ready to queue: {}" , taskInstance);
-            boolean insertQueueResult = 
taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
-            logger.info(String.format("master insert into queue success, task 
: %s", taskInstance.getName()) );
-            return insertQueueResult;
-        }catch (Exception e){
-            logger.error("submit task to queue Exception: ", e);
-            logger.error("task queue error : %s", 
JSONUtils.toJson(taskInstance));
-            return false;
-        }
-    }
 
     /**
      * 
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task
 executed by ip1},${ip2}...
@@ -1127,7 +1088,8 @@ public class ProcessService {
 
         String taskZkInfo = taskZkInfo(taskInstance);
 
-        return taskQueue.checkTaskExists(DOLPHINSCHEDULER_TASKS_QUEUE, 
taskZkInfo);
+//        return taskQueue.checkTaskExists(DOLPHINSCHEDULER_TASKS_QUEUE, 
taskZkInfo);
+        return false;
     }
 
     /**
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
deleted file mode 100644
index bed8a11..0000000
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-import java.util.List;
-import java.util.Set;
-
-public interface ITaskQueue {
-
-    /**
-     * take out all the elements
-     *
-     *
-     * @param key
-     * @return
-     */
-    List<String> getAllTasks(String key);
-
-    /**
-     * check if has a task
-     * @param key queue name
-     * @return true if has; false if not
-     */
-    boolean hasTask(String key);
-
-    /**
-     * check task exists in the task queue or not
-     *
-     * @param key queue name
-     * @param task 
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-     * @return true if exists in the queue
-     */
-    boolean checkTaskExists(String key, String task);
-
-    /**
-     * add an element to the queue
-     *
-     * @param key  queue name
-     * @param value
-     */
-    boolean add(String key, String value);
-
-    /**
-     * an element pops out of the queue
-     *
-     * @param key  queue name
-     * @param n    how many elements to poll
-     * @return
-     */
-    List<String> poll(String key, int n);
-
-    /**
-     * remove a element from queue
-     * @param key
-     * @param value
-     */
-    void removeNode(String key, String value);
-
-    /**
-     * add an element to the set
-     *
-     * @param key
-     * @param value
-     */
-    void sadd(String key, String value);
-
-    /**
-     * delete the value corresponding to the key in the set
-     *
-     * @param key
-     * @param value
-     */
-    void srem(String key, String value);
-
-    /**
-     * gets all the elements of the set based on the key
-     *
-     * @param key
-     * @return
-     */
-    Set<String> smembers(String key);
-
-
-    /**
-     * clear the task queue for use by junit tests only
-     */
-    void delete();
-}
\ No newline at end of file
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
index 6be419f..3ea3195 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
@@ -40,11 +40,11 @@ public class TaskQueueFactory {
    *
    * @return instance
    */
-  public static ITaskQueue getTaskQueueInstance() {
+  public static TaskUpdateQueue getTaskQueueInstance() {
     String queueImplValue = CommonUtils.getQueueImplValue();
     if (StringUtils.isNotBlank(queueImplValue)) {
         logger.info("task queue impl use zookeeper ");
-        return SpringApplicationContext.getBean(TaskQueueZkImpl.class);
+        return SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
     }else{
       logger.error("property dolphinscheduler.queue.impl can't be blank, 
system will exit ");
       System.exit(-1);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
deleted file mode 100644
index 874512c..0000000
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.IpUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.service.zk.ZookeeperOperator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.*;
-
-/**
- * A singleton of a task queue implemented with zookeeper
- * tasks queue implementation
- */
-@Service
-public class TaskQueueZkImpl implements ITaskQueue {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(TaskQueueZkImpl.class);
-
-    private final ZookeeperOperator zookeeperOperator;
-
-    @Autowired
-    public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) {
-        this.zookeeperOperator = zookeeperOperator;
-
-        try {
-            String tasksQueuePath = 
getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-            String tasksKillPath = 
getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
-
-            for (String key : new String[]{tasksQueuePath,tasksKillPath}){
-                if (!zookeeperOperator.isExisted(key)){
-                    zookeeperOperator.persist(key, "");
-                    logger.info("create tasks queue parent node success : {}", 
key);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("create tasks queue parent node failure", e);
-        }
-    }
-
-
-    /**
-     * get all tasks from tasks queue
-     * @param key   task queue name
-     * @return
-     */
-    @Override
-    public List<String> getAllTasks(String key) {
-        try {
-            List<String> list = 
zookeeperOperator.getChildrenKeys(getTasksPath(key));
-            return list;
-        } catch (Exception e) {
-            logger.error("get all tasks from tasks queue exception",e);
-        }
-        return Collections.emptyList();
-    }
-
-    /**
-     * check if has a task
-     * @param key queue name
-     * @return true if has; false if not
-     */
-    @Override
-    public boolean hasTask(String key) {
-        try {
-            return zookeeperOperator.hasChildren(getTasksPath(key));
-        } catch (Exception e) {
-            logger.error("check has task in tasks queue exception",e);
-        }
-        return false;
-    }
-
-    /**
-     * check task exists in the task queue or not
-     *
-     * @param key       queue name
-     * @param task      
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-     * @return true if exists in the queue
-     */
-    @Override
-    public boolean checkTaskExists(String key, String task) {
-        String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task;
-
-        return zookeeperOperator.isExisted(taskPath);
-
-    }
-
-
-    /**
-     * add task to tasks queue
-     *
-     * @param key      task queue name
-     * @param value    
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
-     */
-    @Override
-    public boolean add(String key, String value){
-        try {
-            String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + 
value;
-            zookeeperOperator.persist(taskIdPath, value);
-            return true;
-        } catch (Exception e) {
-            logger.error("add task to tasks queue exception",e);
-            return false;
-        }
-
-    }
-
-
-    /**
-     * An element pops out of the queue <p>
-     * note:
-     *   
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
-     *   The tasks with the highest priority are selected by comparing the 
priorities of the above four levels from high to low.
-     *
-     * @param  key  task queue name
-     * @param  tasksNum    how many elements to poll
-     * @return the task ids  to be executed
-     */
-    @Override
-    public List<String> poll(String key, int tasksNum) {
-        try{
-            List<String> list = 
zookeeperOperator.getChildrenKeys(getTasksPath(key));
-
-            if(list != null && list.size() > 0){
-
-                String workerIp = OSUtils.getHost();
-                String workerIpLongStr = 
String.valueOf(IpUtils.ipToLong(workerIp));
-
-                int size = list.size();
-
-                Set<String> taskTreeSet = new TreeSet<>(new 
Comparator<String>() {
-                    @Override
-                    public int compare(String o1, String o2) {
-
-                        String s1 = o1;
-                        String s2 = o2;
-                        String[] s1Array = s1.split(Constants.UNDERLINE);
-                        if(s1Array.length>4){
-                            // warning: if this length > 5, need to be changed
-                            s1 = s1.substring(0, 
s1.lastIndexOf(Constants.UNDERLINE) );
-                        }
-
-                        String[] s2Array = s2.split(Constants.UNDERLINE);
-                        if(s2Array.length>4){
-                            // warning: if this length > 5, need to be changed
-                            s2 = s2.substring(0, 
s2.lastIndexOf(Constants.UNDERLINE) );
-                        }
-
-                        return s1.compareTo(s2);
-                    }
-                });
-
-                for (int i = 0; i < size; i++) {
-
-                    String taskDetail = list.get(i);
-                    String[] taskDetailArrs = 
taskDetail.split(Constants.UNDERLINE);
-
-                    //forward compatibility
-                    if(taskDetailArrs.length >= 4){
-
-                        //format 
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-                        String formatTask = String.format("%s_%010d_%s_%010d", 
taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], 
Long.parseLong(taskDetailArrs[3]));
-                        if(taskDetailArrs.length > 4){
-                            String taskHosts = taskDetailArrs[4];
-
-                            //task can assign to any worker host if equals 
default ip value of worker server
-                            
if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){
-                                String[] taskHostsArr = 
taskHosts.split(Constants.COMMA);
-                                
if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
-                                    continue;
-                                }
-                            }
-                            formatTask += Constants.UNDERLINE + 
taskDetailArrs[4];
-                        }
-                        taskTreeSet.add(formatTask);
-                    }
-                }
-
-                List<String> tasksList = getTasksListFromTreeSet(tasksNum, 
taskTreeSet);
-
-                logger.info("consume tasks: {},there still have {} tasks need 
to be executed", Arrays.toString(tasksList.toArray()), size - tasksList.size());
-
-                return tasksList;
-            }else{
-                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-            }
-
-        } catch (Exception e) {
-            logger.error("add task to tasks queue exception",e);
-        }
-        return Collections.emptyList();
-    }
-
-
-    /**
-     * get task list from tree set
-     *
-     * @param tasksNum
-     * @param taskTreeSet
-     */
-    public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> 
taskTreeSet) {
-        Iterator<String> iterator = taskTreeSet.iterator();
-        int j = 0;
-        List<String> tasksList = new ArrayList<>(tasksNum);
-        while(iterator.hasNext()){
-            if(j++ >= tasksNum){
-                break;
-            }
-            String task = iterator.next();
-            tasksList.add(getOriginTaskFormat(task));
-        }
-        return tasksList;
-    }
-
-    /**
-     * format 
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-     * processInstanceId and task id need to be convert to int.
-     * @param formatTask
-     * @return
-     */
-    private String getOriginTaskFormat(String formatTask){
-        String[] taskArray = formatTask.split(Constants.UNDERLINE);
-        if(taskArray.length< 4){
-            return formatTask;
-        }
-        int processInstanceId = Integer.parseInt(taskArray[1]);
-        int taskId = Integer.parseInt(taskArray[3]);
-
-        StringBuilder sb = new StringBuilder(50);
-        String destTask = String.format("%s_%s_%s_%s", taskArray[0], 
processInstanceId, taskArray[2], taskId);
-
-        sb.append(destTask);
-
-        if(taskArray.length > 4){
-            for(int index = 4; index < taskArray.length; index++){
-                sb.append(Constants.UNDERLINE).append(taskArray[index]);
-            }
-        }
-        return sb.toString();
-    }
-
-    @Override
-    public void removeNode(String key, String nodeValue){
-
-        String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
-        String taskIdPath = tasksQueuePath + nodeValue;
-        logger.info("removeNode task {}", taskIdPath);
-        try{
-            zookeeperOperator.remove(taskIdPath);
-
-        }catch(Exception e){
-            logger.error(String.format("delete task:%s from zookeeper fail, 
exception:" ,nodeValue) ,e);
-        }
-
-    }
-
-
-
-    /**
-     * In order to be compatible with redis implementation
-     *
-     * To be compatible with the redis implementation, add an element to the 
set
-     * @param key   The key is the kill/cancel queue path name
-     * @param value host-taskId  The name of the zookeeper node
-     */
-    @Override
-    public void sadd(String key,String value) {
-        try {
-
-            if(value != null && value.trim().length() > 0){
-                String path = getTasksPath(key) + Constants.SINGLE_SLASH;
-                if(!zookeeperOperator.isExisted(path + value)){
-                    zookeeperOperator.persist(path + value,value);
-                    logger.info("add task:{} to tasks set ",value);
-                } else{
-                    logger.info("task {} exists in tasks set ",value);
-                }
-
-            }else{
-                logger.warn("add host-taskId:{} to tasks set is empty ",value);
-            }
-
-        } catch (Exception e) {
-            logger.error("add task to tasks set exception",e);
-        }
-    }
-
-
-    /**
-     * delete the value corresponding to the key in the set
-     * @param key   The key is the kill/cancel queue path name
-     * @param value host-taskId-taskType The name of the zookeeper node
-     */
-    @Override
-    public void srem(String key, String value) {
-        try{
-            String path = getTasksPath(key) + Constants.SINGLE_SLASH;
-            zookeeperOperator.remove(path + value);
-
-        }catch(Exception e){
-            logger.error(String.format("delete task:" + value + " 
exception"),e);
-        }
-    }
-
-
-    /**
-     * Gets all the elements of the set based on the key
-     * @param key  The key is the kill/cancel queue path name
-     * @return
-     */
-    @Override
-    public Set<String> smembers(String key) {
-        try {
-            List<String> list = 
zookeeperOperator.getChildrenKeys(getTasksPath(key));
-            return new HashSet<>(list);
-        } catch (Exception e) {
-            logger.error("get all tasks from tasks queue exception",e);
-        }
-        return Collections.emptySet();
-    }
-
-    /**
-     * Clear the task queue of zookeeper node
-     */
-    @Override
-    public void delete(){
-        try {
-            String tasksQueuePath = 
getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-            String tasksKillPath = 
getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
-
-            for (String key : new String[]{tasksQueuePath,tasksKillPath}){
-                if (zookeeperOperator.isExisted(key)){
-                    List<String> list = zookeeperOperator.getChildrenKeys(key);
-                    for (String task : list) {
-                        zookeeperOperator.remove(key + Constants.SINGLE_SLASH 
+ task);
-                        logger.info("delete task from tasks queue : {}/{} ", 
key, task);
-                    }
-                }
-            }
-
-        } catch (Exception e) {
-            logger.error("delete all tasks in tasks queue failure", e);
-        }
-    }
-
-    /**
-     * Get the task queue path
-     * @param key  task queue name
-     * @return
-     */
-    public String getTasksPath(String key){
-        return zookeeperOperator.getZookeeperConfig().getDsRoot() + 
Constants.SINGLE_SLASH + key;
-    }
-
-}
diff --git 
a/dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
similarity index 52%
rename from dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
rename to 
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
index a0cc457..48f510e 100644
--- a/dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
@@ -14,35 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package queue;
+package org.apache.dolphinscheduler.service.queue;
 
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
-import org.junit.*;
 
-/**
- * base task queue test for only start zk server once
- */
-@Ignore
-public class BaseTaskQueueTest {
+public interface TaskUpdateQueue {
 
-    protected static ITaskQueue tasksQueue = null;
+    /**
+     * put task info
+     *
+     * @param taskInfo taskInfo
+     * @throws Exception
+     */
+    void put(String taskInfo) throws Exception;
 
-    @BeforeClass
-    public static void setup() {
-        ZKServer.start();
-        tasksQueue = TaskQueueFactory.getTaskQueueInstance();
-        //clear all data
-        tasksQueue.delete();
-    }
+    /**
+     * take taskInfo
+     * @return taskInfo
+     * @throws Exception
+     */
+    String take()throws Exception;
 
-    @AfterClass
-    public static void tearDown() {
-        tasksQueue.delete();
-        ZKServer.stop();
-    }
-    @Test
-    public void tasksQueueNotNull(){
-        Assert.assertNotNull(tasksQueue);
-    }
-}
+    /**
+     * size
+     *
+     * @return size
+     * @throws Exception
+     */
+    int size() throws Exception;
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
new file mode 100644
index 0000000..fda5a4c
--- /dev/null
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.queue;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import static org.apache.dolphinscheduler.common.Constants.*;
+
+/**
+ * A singleton of a task queue implemented with zookeeper
+ * tasks queue implementation
+ */
+@Service
+public class TaskUpdateQueueImpl implements TaskUpdateQueue {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TaskUpdateQueueImpl.class);
+
+    /**
+     * queue size
+     */
+    private static final Integer QUEUE_MAX_SIZE = 100;
+
+    /**
+     * queue
+     */
+    private PriorityBlockingQueue<String> queue = new 
PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+
+    /**
+     * put task takePriorityInfo
+     *
+     * @param taskPriorityInfo takePriorityInfo
+     * @throws Exception
+     */
+    @Override
+    public void put(String taskPriorityInfo) throws Exception {
+
+        if (QUEUE_MAX_SIZE.equals(queue.size())){
+            //TODO need persist db , then load from db to queue when queue 
size is zero
+            logger.error("queue is full...");
+            return;
+        }
+        queue.put(taskPriorityInfo);
+    }
+
+    /**
+     * take taskInfo
+     * @return taskInfo
+     * @throws Exception
+     */
+    @Override
+    public String take() throws Exception {
+        return queue.take();
+    }
+
+    /**
+     * queue size
+     * @return size
+     * @throws Exception
+     */
+    @Override
+    public int size() throws Exception {
+        return queue.size();
+    }
+
+    /**
+     * TaskInfoComparator
+     */
+    private class TaskInfoComparator implements Comparator<String>{
+
+        /**
+         * compare o1 o2
+         * @param o1 o1
+         * @param o2 o2
+         * @return compare result
+         */
+        @Override
+        public int compare(String o1, String o2) {
+            String s1 = o1;
+            String s2 = o2;
+            String[] s1Array = s1.split(UNDERLINE);
+            if(s1Array.length > TASK_INFO_LENGTH){
+                // warning: if this length > 5, need to be changed
+                s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
+            }
+
+            String[] s2Array = s2.split(UNDERLINE);
+            if(s2Array.length > TASK_INFO_LENGTH){
+                // warning: if this length > 5, need to be changed
+                s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
+            }
+
+            return s1.compareTo(s2);
+        }
+    }
+}
diff --git 
a/dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java 
b/dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
deleted file mode 100644
index d29c5aa..0000000
--- a/dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package queue;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.IpUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-/**
- * task queue test
- */
-@Ignore
-public class TaskQueueZKImplTest extends BaseTaskQueueTest  {
-
-    @Before
-    public void before(){
-
-        //clear all data
-        tasksQueue.delete();
-    }
-
-    @After
-    public void after(){
-        //clear all data
-        tasksQueue.delete();
-    }
-
-    /**
-     * test take out all the elements
-     */
-    @Test
-    public  void getAllTasks(){
-
-        //add
-        init();
-        // get all
-        List<String> allTasks = 
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        assertEquals(allTasks.size(),2);
-        //delete all
-        tasksQueue.delete();
-        allTasks = 
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        assertEquals(allTasks.size(),0);
-    }
-    @Test
-    public void hasTask(){
-        init();
-        boolean hasTask = 
tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        assertTrue(hasTask);
-        //delete all
-        tasksQueue.delete();
-        hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        assertFalse(hasTask);
-    }
-    /**
-     * test check task exists in the task queue or not
-     */
-    @Test
-    public  void checkTaskExists(){
-
-        String task= "1_0_1_1_-1";
-        //add
-        init();
-        // check Exist true
-        boolean taskExists = 
tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
-        assertTrue(taskExists);
-
-        //remove task
-        tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        // check Exist false
-        taskExists = 
tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
-        assertFalse(taskExists);
-    }
-
-    /**
-     * test add  element to the queue
-     */
-    @Test
-    public void add(){
-
-        //add
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_0_0_1_" + 
IpUtils.ipToLong(OSUtils.getHost()));
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_2_1_1_" + 
IpUtils.ipToLong(OSUtils.getHost()) + 10);
-
-        List<String> tasks = 
tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1);
-
-        if(tasks.size() <= 0){
-            return;
-        }
-
-        //pop
-        String node1 = tasks.get(0);
-        assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost()));
-    }
-
-    /**
-     *  test  element pops out of the queue
-     */
-    @Test
-    public  void poll(){
-        
-        //add
-        init();
-        List<String> taskList = 
tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 2);
-        assertEquals(taskList.size(),2);
-
-        assertEquals(taskList.get(0),"0_1_1_1_-1");
-        assertEquals(taskList.get(1),"1_0_1_1_-1");
-    }
-
-    /**
-     * test remove  element from queue
-     */
-    @Test
-    public void removeNode(){
-        String task = "1_0_1_1_-1";
-        //add
-        init();
-        tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        
assertFalse(tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task));
-    }
-
-    /**
-     * test add an element to the set
-     */
-    @Test
-    public void sadd(){
-
-        String task = "1_0_1_1_-1";
-        tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        //check size
-        
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
-    }
-
-
-    /**
-     * test delete the value corresponding to the key in the set
-     */
-    @Test
-    public void srem(){
-
-        String task = "1_0_1_1_-1";
-        tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        //check size
-        
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
-        //remove and get size
-        tasksQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0);
-    }
-
-    /**
-     * test gets all the elements of the set based on the key
-     */
-    @Test
-    public void smembers(){
-
-        //first init
-        
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0);
-        //add
-        String task = "1_0_1_1_-1";
-        tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        //check size
-        
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
-        //add
-        task = "0_1_1_1_";
-        tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        //check size
-        
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),2);
-    }
-
-
-    /**
-     * init data
-     */
-    private void init(){
-        //add
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
-    }
-
-
-
-    /**
-     * test one million data from zookeeper queue
-     */
-    @Ignore
-    @Test
-    public void extremeTest(){
-        int total = 30 * 10000;
-
-        for(int i = 0; i < total; i++) {
-            for(int j = 0; j < total; j++) {
-                
//${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-                //format 
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-                String formatTask = String.format("%s_%d_%s_%d", i, i + 1, j, 
j == 0 ?  0 : j + new Random().nextInt(100));
-                tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 
formatTask);
-            }
-        }
-
-        String node1 = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 
1).get(0);
-        assertEquals(node1,"0");
-
-    }
-
-}
diff --git 
a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java 
b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
new file mode 100644
index 0000000..a0e4fad
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package queue;
+
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TaskUpdateQueueTest {
+
+    /**
+     * test put
+     */
+    @Test
+    public void testQueue() throws Exception{
+
+        // 
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
+
+        /**
+         * 1_1_2_1_default
+         * 1_1_2_2_default
+         * 1_1_0_3_default
+         * 1_1_0_4_default
+         */
+
+        String taskInfo1 = "1_1_2_1_default";
+        String taskInfo2 = "1_1_2_2_default";
+        String taskInfo3 = "1_1_0_3_default";
+        String taskInfo4 = "1_1_0_4_default";
+
+        TaskUpdateQueue queue = new TaskUpdateQueueImpl();
+        queue.put(taskInfo1);
+        queue.put(taskInfo2);
+        queue.put(taskInfo3);
+        queue.put(taskInfo4);
+
+        assertEquals("1_1_0_3_default", queue.take());
+        assertEquals("1_1_0_4_default", queue.take());
+        assertEquals("1_1_2_1_default",queue.take());
+        assertEquals("1_1_2_2_default",queue.take());
+    }
+}

Reply via email to