This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new e467910  Refactor worker (#1997)
e467910 is described below

commit e4679109af40f874b0579ac8f719fc90a1a95998
Author: Tboy <[email protected]>
AuthorDate: Sat Feb 22 21:39:48 2020 +0800

    Refactor worker (#1997)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../dolphinscheduler/common/thread/Stopper.java    |   2 +-
 .../remote/NettyRemotingClient.java                |  92 +++++++++----
 .../remote/command/ExecuteTaskAckCommand.java      |   2 +-
 .../remote/command/ExecuteTaskRequestCommand.java  |   2 +-
 .../remote/command/ExecuteTaskResponseCommand.java |   2 +-
 .../remote/entity/TaskExecutionContext.java        |  15 ++-
 .../remote/handler/NettyClientHandler.java         | 103 ++++++++++-----
 .../remote/handler/NettyServerHandler.java         |   2 +-
 .../dolphinscheduler/remote/utils/Address.java     |  96 --------------
 .../remote/utils/ChannelUtils.java                 |   4 +-
 .../dolphinscheduler/remote/utils}/Host.java       |  13 +-
 .../remote/NettyRemotingClientTest.java            |   6 +-
 .../builder/TaskExecutionContextBuilder.java       |   2 +-
 .../server/master/MasterServer.java                |   3 +-
 .../server/master/dispatch/ExecutorDispatcher.java |  77 +++++++++++
 .../context/ExecutionContext.java}                 |  38 +++---
 .../enums/ExecutorType.java}                       |  10 +-
 .../dispatch/exceptions/ExecuteException.java      |  95 ++++++++++++++
 .../executor/AbstractExecutorManager.java}         |  17 ++-
 .../executor/ExecutorManager.java}                 |  12 +-
 .../dispatch/executor/NettyExecutorManager.java    | 144 +++++++++++++++++++++
 .../master/{ => dispatch}/host/HostManager.java    |   7 +-
 .../{ => dispatch}/host/RoundRobinHostManager.java |  34 +++--
 .../{ => dispatch}/host/assign/RandomSelector.java |   2 +-
 .../host/assign/RoundRobinSelector.java            |   6 +-
 .../{ => dispatch}/host/assign/Selector.java       |   2 +-
 ...esponseProcessor.java => TaskAckProcessor.java} |  33 ++---
 .../master/processor/TaskResponseProcessor.java    |   2 -
 .../master/runner/MasterBaseTaskExecThread.java    |  53 ++++----
 .../server/master/runner/MasterExecThread.java     |   4 -
 .../server/registry/ZookeeperNodeManager.java      |   2 +-
 .../server/registry/ZookeeperRegistryCenter.java   |   2 +-
 .../worker/processor/TaskCallbackService.java      |   5 +-
 .../worker/processor/WorkerRequestProcessor.java   |   8 +-
 .../server/worker/runner/TaskScheduleThread.java   |  22 ++--
 .../service/log/LogClientService.java              |   9 +-
 36 files changed, 625 insertions(+), 303 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
index cad6914..67c2c81 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
@@ -34,6 +34,6 @@ public class Stopper {
        }
        
        public static final void stop(){
-               signal.getAndSet(true);
+               signal.set(true);
        }
 }
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 96258d7..4c5b365 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -25,6 +25,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
 import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
 import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
@@ -33,7 +34,8 @@ import 
org.apache.dolphinscheduler.remote.future.InvokeCallback;
 import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
 import org.apache.dolphinscheduler.remote.future.ResponseFuture;
 import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.slf4j.Logger;
@@ -64,7 +66,7 @@ public class NettyRemotingClient {
     /**
      * channels
      */
-    private final ConcurrentHashMap<Address, Channel> channels = new 
ConcurrentHashMap(128);
+    private final ConcurrentHashMap<Host, Channel> channels = new 
ConcurrentHashMap(128);
 
     /**
      *  started flag
@@ -158,17 +160,17 @@ public class NettyRemotingClient {
 
     /**
      *  async send
-     * @param address address
+     * @param host host
      * @param command command
      * @param timeoutMillis timeoutMillis
      * @param invokeCallback callback function
      * @throws InterruptedException
      * @throws RemotingException
      */
-    public void sendAsync(final Address address, final Command command,
+    public void sendAsync(final Host host, final Command command,
                           final long timeoutMillis,
                           final InvokeCallback invokeCallback) throws 
InterruptedException, RemotingException {
-        final Channel channel = getChannel(address);
+        final Channel channel = getChannel(host);
         if (channel == null) {
             throw new RemotingException("network error");
         }
@@ -214,7 +216,7 @@ public class NettyRemotingClient {
                 });
             } catch (Throwable ex){
                 responseFuture.release();
-                throw new RemotingException(String.format("send command to 
address: %s failed", address), ex);
+                throw new RemotingException(String.format("send command to 
host: %s failed", host), ex);
             }
         } else{
             String message = String.format("try to acquire async semaphore 
timeout: %d, waiting thread num: %d, total permits: %d",
@@ -225,17 +227,17 @@ public class NettyRemotingClient {
 
     /**
      * sync send
-     * @param address address
+     * @param host host
      * @param command command
      * @param timeoutMillis timeoutMillis
      * @return command
      * @throws InterruptedException
      * @throws RemotingException
      */
-    public Command sendSync(final Address address, final Command command, 
final long timeoutMillis) throws InterruptedException, RemotingException {
-        final Channel channel = getChannel(address);
+    public Command sendSync(final Host host, final Command command, final long 
timeoutMillis) throws InterruptedException, RemotingException {
+        final Channel channel = getChannel(host);
         if (channel == null) {
-            throw new RemotingException(String.format("connect to : %s fail", 
address));
+            throw new RemotingException(String.format("connect to : %s fail", 
host));
         }
         final long opaque = command.getOpaque();
         final ResponseFuture responseFuture = new ResponseFuture(opaque, 
timeoutMillis, null, null);
@@ -250,7 +252,7 @@ public class NettyRemotingClient {
                 }
                 responseFuture.setCause(future.cause());
                 responseFuture.putResponse(null);
-                logger.error("send command {} to address {} failed", command, 
address);
+                logger.error("send command {} to host {} failed", command, 
host);
             }
         });
         /**
@@ -259,49 +261,89 @@ public class NettyRemotingClient {
         Command result = responseFuture.waitResponse();
         if(result == null){
             if(responseFuture.isSendOK()){
-                throw new RemotingTimeoutException(address.toString(), 
timeoutMillis, responseFuture.getCause());
+                throw new RemotingTimeoutException(host.toString(), 
timeoutMillis, responseFuture.getCause());
             } else{
-                throw new RemotingException(address.toString(), 
responseFuture.getCause());
+                throw new RemotingException(host.toString(), 
responseFuture.getCause());
             }
         }
         return result;
     }
 
+    public void send(final Host host, final Command command) throws 
RemotingException {
+        Channel channel = getChannel(host);
+        if (channel == null) {
+            throw new RemotingException(String.format("connect to : %s fail", 
host));
+        }
+        try {
+            ChannelFuture future = channel.writeAndFlush(command).await();
+            if (future.isSuccess()) {
+                logger.debug("send command : {} , to : {} successfully.", 
command, host.getAddress());
+            } else {
+                String msg = String.format("send command : %s , to :%s 
failed", command, host.getAddress());
+                logger.error(msg, future.cause());
+                throw new RemotingException(msg);
+            }
+        } catch (Exception e) {
+            logger.error("Send command {} to address {} encounter error.", 
command, host.getAddress());
+            throw new RemotingException(String.format("Send command : %s , to 
:%s encounter error", command, host.getAddress()), e);
+        }
+    }
+
+    /**
+     *  register processor
+     * @param commandType command type
+     * @param processor processor
+     */
+    public void registerProcessor(final CommandType commandType, final 
NettyRequestProcessor processor) {
+        this.registerProcessor(commandType, processor, null);
+    }
+
+    /**
+     *  register processor
+     *
+     * @param commandType command type
+     * @param processor processor
+     * @param executor thread executor
+     */
+    public void registerProcessor(final CommandType commandType, final 
NettyRequestProcessor processor, final ExecutorService executor) {
+        this.clientHandler.registerProcessor(commandType, processor, executor);
+    }
+
     /**
      *  get channel
-     * @param address
+     * @param host
      * @return
      */
-    public Channel getChannel(Address address) {
-        Channel channel = channels.get(address);
+    public Channel getChannel(Host host) {
+        Channel channel = channels.get(host);
         if(channel != null && channel.isActive()){
             return channel;
         }
-        return createChannel(address, true);
+        return createChannel(host, true);
     }
 
     /**
      * create channel
-     * @param address address
+     * @param host host
      * @param isSync sync flag
      * @return channel
      */
-    public Channel createChannel(Address address, boolean isSync) {
+    public Channel createChannel(Host host, boolean isSync) {
         ChannelFuture future;
         try {
             synchronized (bootstrap){
-                future = bootstrap.connect(new 
InetSocketAddress(address.getHost(), address.getPort()));
+                future = bootstrap.connect(new InetSocketAddress(host.getIp(), 
host.getPort()));
             }
             if(isSync){
                 future.sync();
             }
             if (future.isSuccess()) {
                 Channel channel = future.channel();
-                channels.put(address, channel);
+                channels.put(host, channel);
                 return channel;
             }
         } catch (Exception ex) {
-            logger.info("connect to {} error  {}", address, ex);
+            logger.info("connect to {} error  {}", host, ex);
         }
         return null;
     }
@@ -341,10 +383,10 @@ public class NettyRemotingClient {
 
     /**
      * close channel
-     * @param address address
+     * @param host host
      */
-    public void closeChannel(Address address){
-        Channel channel = this.channels.remove(address);
+    public void closeChannel(Host host){
+        Channel channel = this.channels.remove(host);
         if(channel != null){
             channel.close();
         }
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
index 7f7da0e..8c50a25 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 request command
 */
public class ExecuteTaskAckCommand implements Serializable {

    private int taskInstanceId;

    private Date startTime;

    private String host;

    private int status;

    private String logPath;

    private String executePath;

    public Date getStartTime() {
        return startTime;
    }

    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = lo
 gPath;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(long opaque){
        Command command = new Command(opaque);
        command.setType(CommandType.EXECUTE_TASK_ACK);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskAckCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", startTime=" + startTime +
                ", host='" + host + '\'' +
                ", status=" + status +
                ", logPath='" + logPath + '\'' +
                ", executePath='" + executePath + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 request command
 */
public class ExecuteTaskAckCommand implements Serializable {

    private int taskInstanceId;

    private Date startTime;

    private String host;

    private int status;

    private String logPath;

    private String executePath;

    public Date getStartTime() {
        return startTime;
    }

    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = lo
 gPath;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_ACK);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskAckCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", startTime=" + startTime +
                ", host='" + host + '\'' +
                ", status=" + status +
                ", logPath='" + logPath + '\'' +
                ", executePath='" + executePath + '\'' +
                '}';
    }
}
\ No newline at end of file
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
index e1556f3..e7564ed 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
pub
 lic class ExecuteTaskRequestCommand implements Serializable {

    /**
     *  task instance json
     */
    private String taskInfoJson;

    public String getTaskInfoJson() {
        return taskInfoJson;
    }

    public void setTaskInfoJson(String taskInfoJson) {
        this.taskInfoJson = taskInfoJson;
    }

    public ExecuteTaskRequestCommand() {
    }

    public ExecuteTaskRequestCommand(String taskInfoJson) {
        this.taskInfoJson = taskInfoJson;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskRequestCommand{" +
                "taskInfoJson='" + taskInfoJson + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
pub
 lic class ExecuteTaskRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public ExecuteTaskRequestCommand() {
    }

    public ExecuteTaskRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskRequestCommand{" +
           
      "taskExecutionContext='" + taskExecutionContext + '\'' +
                '}';
    }
}
\ No newline at end of file
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
index 8193c9d..6bbc2f7 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 response command
 */
public class ExecuteTaskResponseCommand implements Serializable {


    public ExecuteTaskResponseCommand() {
    }

    public ExecuteTaskResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        this.endTime = endTime;
    }

    /**
     * package response command
     *
     * @param opaque request unique identif
 ication
     * @return command
     */
    public Command convert2Command(long opaque){
        Command command = new Command(opaque);
        command.setType(CommandType.EXECUTE_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 response command
 */
public class ExecuteTaskResponseCommand implements Serializable {


    public ExecuteTaskResponseCommand() {
    }

    public ExecuteTaskResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        this.endTime = endTime;
    }

    /**
     * package response command
     * @return command
     */
    public Command 
 convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

}
\ No newline at end of file
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
index 783d166..e3da43a 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
@@ -26,9 +26,9 @@ import java.util.Date;
 public class TaskExecutionContext implements Serializable{
 
     /**
-     *  task instance id
+     *  task id
      */
-    private Integer taskId;
+    private Integer taskInstanceId;
 
 
     /**
@@ -107,12 +107,13 @@ public class TaskExecutionContext implements Serializable{
      */
     private Integer projectId;
 
-    public Integer getTaskId() {
-        return taskId;
+
+    public Integer getTaskInstanceId() {
+        return taskInstanceId;
     }
 
-    public void setTaskId(Integer taskId) {
-        this.taskId = taskId;
+    public void setTaskInstanceId(Integer taskInstanceId) {
+        this.taskInstanceId = taskInstanceId;
     }
 
     public String getTaskName() {
@@ -230,7 +231,7 @@ public class TaskExecutionContext implements Serializable{
     @Override
     public String toString() {
         return "TaskExecutionContext{" +
-                "taskId=" + taskId +
+                "taskInstanceId=" + taskInstanceId +
                 ", taskName='" + taskName + '\'' +
                 ", startTime=" + startTime +
                 ", taskType='" + taskType + '\'' +
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
index d5d0d4d..48d78d9 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.remote.handler;
 import io.netty.channel.*;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.future.ResponseFuture;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  *  netty client request handler
@@ -44,9 +51,20 @@ public class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
      */
     private final ExecutorService callbackExecutor;
 
+    /**
+     * processors
+     */
+    private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, 
ExecutorService>> processors;
+
+    /**
+     *  default executor
+     */
+    private final ExecutorService defaultExecutor = 
Executors.newFixedThreadPool(Constants.CPUS);
+
     public NettyClientHandler(NettyRemotingClient nettyRemotingClient, 
ExecutorService callbackExecutor){
         this.nettyRemotingClient = nettyRemotingClient;
         this.callbackExecutor = callbackExecutor;
+        this.processors = new ConcurrentHashMap();
     }
 
     /**
@@ -71,18 +89,43 @@ public class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
      */
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-        processReceived((Command)msg);
+        processReceived(ctx.channel(), (Command)msg);
+    }
+
+    /**
+     * register processor
+     *
+     * @param commandType command type
+     * @param processor processor
+     */
+    public void registerProcessor(final CommandType commandType, final 
NettyRequestProcessor processor) {
+        this.registerProcessor(commandType, processor, null);
+    }
+
+    /**
+     *  register processor
+     *
+     * @param commandType command type
+     * @param processor processor
+     * @param executor thread executor
+     */
+    public void registerProcessor(final CommandType commandType, final 
NettyRequestProcessor processor, final ExecutorService executor) {
+        ExecutorService executorRef = executor;
+        if(executorRef == null){
+            executorRef = defaultExecutor;
+        }
+        this.processors.putIfAbsent(commandType, new Pair<>(processor, 
executorRef));
     }
 
     /**
      *  process received logic
      *
-     * @param responseCommand responseCommand
+     * @param command command
      */
-    private void processReceived(final Command responseCommand) {
-        ResponseFuture future = 
ResponseFuture.getFuture(responseCommand.getOpaque());
+    private void processReceived(final Channel channel, final Command command) 
{
+        ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
         if(future != null){
-            future.setResponseCommand(responseCommand);
+            future.setResponseCommand(command);
             future.release();
             if(future.getInvokeCallback() != null){
                 this.callbackExecutor.submit(new Runnable() {
@@ -92,10 +135,30 @@ public class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
                     }
                 });
             } else{
-                future.putResponse(responseCommand);
+                future.putResponse(command);
             }
         } else{
-            logger.warn("receive response {}, but not matched any request ", 
responseCommand);
+            processByCommandType(channel, command);
+        }
+    }
+
+    public void processByCommandType(final Channel channel, final Command 
command) {
+        final Pair<NettyRequestProcessor, ExecutorService> pair = 
processors.get(command.getType());
+        if (pair != null) {
+            Runnable run = () -> {
+                try {
+                    pair.getLeft().process(channel, command);
+                } catch (Throwable e) {
+                    logger.error(String.format("process command %s exception", 
command), e);
+                }
+            };
+            try {
+                pair.getRight().submit(run);
+            } catch (RejectedExecutionException e) {
+                logger.warn("thread pool is full, discard command {} from {}", 
command, ChannelUtils.getRemoteAddress(channel));
+            }
+        } else {
+            logger.warn("receive response {}, but not matched any request ", 
command);
         }
     }
 
@@ -112,30 +175,4 @@ public class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
         ctx.channel().close();
     }
 
-    /**
-     *  channel write changed
-     *
-     * @param ctx channel handler context
-     * @throws Exception
-     */
-    @Override
-    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
-        Channel ch = ctx.channel();
-        ChannelConfig config = ch.config();
-
-        if (!ch.isWritable()) {
-            if (logger.isWarnEnabled()) {
-                logger.warn("{} is not writable, over high water level : {}",
-                        new Object[]{ch, 
config.getWriteBufferHighWaterMark()});
-            }
-
-            config.setAutoRead(false);
-        } else {
-            if (logger.isWarnEnabled()) {
-                logger.warn("{} is writable, to low water : {}",
-                        new Object[]{ch, config.getWriteBufferLowWaterMark()});
-            }
-            config.setAutoRead(true);
-        }
-    }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
index eabd656..2a4f784 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -98,7 +98,7 @@ public class NettyServerHandler extends 
ChannelInboundHandlerAdapter {
         if(executorRef == null){
             executorRef = nettyRemotingServer.getDefaultExecutor();
         }
-        this.processors.putIfAbsent(commandType, new 
Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
+        this.processors.putIfAbsent(commandType, new Pair<>(processor, 
executorRef));
     }
 
     /**
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
deleted file mode 100644
index f61dcd6..0000000
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.remote.utils;
-
-import java.io.Serializable;
-
-/**
- *  server address
- */
-public class Address implements Serializable {
-
-    /**
-     *  host
-     */
-    private String host;
-
-    /**
-     *  port
-     */
-    private int port;
-
-    public Address(){
-        //NOP
-    }
-
-    public Address(String host, int port){
-        this.host = host;
-        this.port = port;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((host == null) ? 0 : host.hashCode());
-        result = prime * result + port;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        Address other = (Address) obj;
-        if (host == null) {
-            if (other.host != null) {
-                return false;
-            }
-        } else if (!host.equals(other.host)) {
-            return false;
-        }
-        return port == other.port;
-    }
-
-    @Override
-    public String toString() {
-        return "Address [host=" + host + ", port=" + port + "]";
-    }
-}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index d7af5fe..138a8f0 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -49,9 +49,9 @@ public class ChannelUtils {
      * @param channel channel
      * @return address
      */
-    public static Address toAddress(Channel channel){
+    public static Host toAddress(Channel channel){
         InetSocketAddress socketAddress = 
((InetSocketAddress)channel.remoteAddress());
-        return new Address(socketAddress.getAddress().getHostAddress(), 
socketAddress.getPort());
+        return new Host(socketAddress.getAddress().getHostAddress(), 
socketAddress.getPort());
     }
 
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
similarity index 90%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java
rename to 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index 57e64c1..f53c611 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -14,14 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.dolphinscheduler.remote.utils;
 
-package org.apache.dolphinscheduler.server.master.host;
-
-
+import java.io.Serializable;
 import java.util.Objects;
 
-
-public class Host {
+/**
+ *  server address
+ */
+public class Host implements Serializable {
 
     private String address;
 
@@ -67,7 +68,7 @@ public class Host {
     public static Host of(String address){
         String[] parts = address.split(":");
         if (parts.length != 2) {
-            throw new IllegalArgumentException(String.format("Address : %s 
illegal.", address));
+            throw new IllegalArgumentException(String.format("Host : %s 
illegal.", address));
         }
         Host host = new Host(parts[0], Integer.parseInt(parts[1]));
         return host;
diff --git 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
index ef46c2c..cfc10b2 100644
--- 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
+++ 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
@@ -27,7 +27,7 @@ import 
org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.remote.future.InvokeCallback;
 import org.apache.dolphinscheduler.remote.future.ResponseFuture;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -62,7 +62,7 @@ public class NettyRemotingClientTest {
         NettyRemotingClient client = new NettyRemotingClient(clientConfig);
         Command commandPing = Ping.create();
         try {
-            Command response = client.sendSync(new Address("127.0.0.1", 
serverConfig.getListenPort()), commandPing, 2000);
+            Command response = client.sendSync(new Host("127.0.0.1", 
serverConfig.getListenPort()), commandPing, 2000);
             Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
         } catch (Exception e) {
             e.printStackTrace();
@@ -93,7 +93,7 @@ public class NettyRemotingClientTest {
         Command commandPing = Ping.create();
         try {
             final AtomicLong opaque = new AtomicLong(0);
-            client.sendAsync(new Address("127.0.0.1", 
serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
+            client.sendAsync(new Host("127.0.0.1", 
serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
                 @Override
                 public void operationComplete(ResponseFuture responseFuture) {
                     opaque.set(responseFuture.getOpaque());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index cafd894..a3ddd29 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -40,7 +40,7 @@ public class TaskExecutionContextBuilder {
      * @return TaskExecutionContextBuilder
      */
     public TaskExecutionContextBuilder 
buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
-        taskExecutionContext.setTaskId(taskInstance.getId());
+        taskExecutionContext.setTaskInstanceId(taskInstance.getId());
         taskExecutionContext.setTaskName(taskInstance.getName());
         taskExecutionContext.setStartTime(taskInstance.getStartTime());
         taskExecutionContext.setTaskType(taskInstance.getTaskType());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index d0c7bc2..9493b72 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@@ -106,7 +107,6 @@ public class MasterServer implements IStoppable {
     public static void main(String[] args) {
         Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
         new 
SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
-
     }
 
     /**
@@ -121,6 +121,7 @@ public class MasterServer implements IStoppable {
         serverConfig.setListenPort(45678);
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, 
new TaskResponseProcessor(processService));
+        
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new 
TaskAckProcessor(processService));
         this.nettyRemotingServer.start();
 
         //
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
new file mode 100644
index 0000000..2fd303a
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch;
+
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import 
org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
+import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import 
org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+public class ExecutorDispatcher implements InitializingBean {
+
+    @Autowired
+    private NettyExecutorManager nettyExecutorManager;
+
+    @Autowired
+    private RoundRobinHostManager hostManager;
+
+    private final ConcurrentHashMap<ExecutorType, ExecutorManager> 
executorManagers;
+
+    public ExecutorDispatcher(){
+        this.executorManagers = new ConcurrentHashMap<>();
+    }
+
+    public void dispatch(final ExecutionContext executeContext) throws 
ExecuteException {
+        ExecutorManager executorManager = 
this.executorManagers.get(executeContext.getExecutorType());
+        if(executorManager == null){
+            throw new ExecuteException("no ExecutorManager for type : " + 
executeContext.getExecutorType());
+        }
+        Host host = hostManager.select(executeContext);
+        if (StringUtils.isEmpty(host.getAddress())) {
+            throw new ExecuteException(String.format("fail to execute : %s due 
to no worker ", executeContext.getContext()));
+        }
+        executeContext.setHost(host);
+        executorManager.beforeExecute(executeContext);
+        try {
+            executorManager.execute(executeContext);
+        } finally {
+            executorManager.afterExecute(executeContext);
+        }
+    }
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        register(ExecutorType.WORKER, nettyExecutorManager);
+        register(ExecutorType.CLIENT, nettyExecutorManager);
+    }
+
+    public void register(ExecutorType type, ExecutorManager executorManager){
+        executorManagers.put(type, executorManager);
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
similarity index 52%
copy from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
copy to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 3a3f123..4bccba0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -14,32 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.dolphinscheduler.server.master.dispatch.context;
 
-package org.apache.dolphinscheduler.server.master.host.assign;
 
-import java.util.Collection;
-import java.util.Random;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 
+public class ExecutionContext {
 
-public class RandomSelector<T> implements Selector<T> {
+    private Host host;
 
-    private final Random random = new Random();
+    private final Object context;
 
-    @Override
-    public T select(final Collection<T> source) {
+    private final ExecutorType executorType;
 
-        if (source == null || source.size() == 0) {
-            throw new IllegalArgumentException("Empty source.");
-        }
+    public ExecutionContext(Object context, ExecutorType executorType) {
+        this.context = context;
+        this.executorType = executorType;
+    }
 
-        if (source.size() == 1) {
-            return (T) source.toArray()[0];
-        }
+    public ExecutorType getExecutorType() {
+        return executorType;
+    }
 
-        int size = source.size();
-        int randomIndex = random.nextInt(size);
+    public Object getContext() {
+        return context;
+    }
 
-        return (T) source.toArray()[randomIndex];
+    public Host getHost() {
+        return host;
     }
 
+    public void setHost(Host host) {
+        this.host = host;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
similarity index 79%
copy from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
copy to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
index 316ce36..70aaeae 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
@@ -14,14 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.dolphinscheduler.server.master.dispatch.enums;
 
-package org.apache.dolphinscheduler.server.master.host;
 
+public enum ExecutorType {
 
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-
-public interface HostManager {
-
-    Host select(TaskExecutionContext context);
+    WORKER,
 
+    CLIENT;
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
new file mode 100644
index 0000000..d8ca50a
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
+
+
+public class ExecuteException extends Exception{
+
+    public ExecuteException() {
+        super();
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message.  The
+     * cause is not initialized, and may subsequently be initialized by
+     * a call to {@link #initCause}.
+     *
+     * @param   message   the detail message. The detail message is saved for
+     *          later retrieval by the {@link #getMessage()} method.
+     */
+    public ExecuteException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and
+     * cause.  <p>Note that the detail message associated with
+     * {@code cause} is <i>not</i> automatically incorporated in
+     * this exception's detail message.
+     *
+     * @param  message the detail message (which is saved for later retrieval
+     *         by the {@link #getMessage()} method).
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     * @since  1.4
+     */
+    public ExecuteException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause and a detail
+     * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+     * typically contains the class and detail message of <tt>cause</tt>).
+     * This constructor is useful for exceptions that are little more than
+     * wrappers for other throwables (for example, {@link
+     * java.security.PrivilegedActionException}).
+     *
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     * @since  1.4
+     */
+    public ExecuteException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message,
+     * cause, suppression enabled or disabled, and writable stack
+     * trace enabled or disabled.
+     *
+     * @param  message the detail message.
+     * @param cause the cause.  (A {@code null} value is permitted,
+     * and indicates that the cause is nonexistent or unknown.)
+     * @param enableSuppression whether or not suppression is enabled
+     *                          or disabled
+     * @param writableStackTrace whether or not the stack trace should
+     *                           be writable
+     * @since 1.7
+     */
+    protected ExecuteException(String message, Throwable cause,
+                               boolean enableSuppression,
+                               boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
similarity index 57%
copy from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
copy to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
index 316ce36..65ed15e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -15,13 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
 
+import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 
-public interface HostManager {
+public abstract class AbstractExecutorManager implements ExecutorManager{
 
-    Host select(TaskExecutionContext context);
+    @Override
+    public void beforeExecute(ExecutionContext executeContext) throws 
ExecuteException {
+        //TODO add time monitor
+    }
 
+    @Override
+    public void afterExecute(ExecutionContext executeContext) throws 
ExecuteException {
+        //TODO add dispatch monitor
+
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
similarity index 61%
copy from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
copy to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 316ce36..98d391e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -15,13 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
 
+import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 
-public interface HostManager {
+public interface ExecutorManager {
 
-    Host select(TaskExecutionContext context);
+    void beforeExecute(ExecutionContext executeContext) throws 
ExecuteException;
 
+    void execute(ExecutionContext executeContext) throws ExecuteException;
+
+    void afterExecute(ExecutionContext executeContext) throws ExecuteException;
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
new file mode 100644
index 0000000..dac8d79
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+
+@Service
+public class NettyExecutorManager extends AbstractExecutorManager{
+
+    private final Logger logger = 
LoggerFactory.getLogger(NettyExecutorManager.class);
+
+    @Autowired
+    private ZookeeperNodeManager zookeeperNodeManager;
+
+    private final NettyRemotingClient nettyRemotingClient;
+
+    public NettyExecutorManager(){
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+    }
+
+    @Override
+    public void execute(ExecutionContext executeContext) throws 
ExecuteException {
+        Set<String> allNodes = getAllNodes(executeContext);
+        Set<String> failNodeSet = new HashSet<>();
+        //
+        Command command = buildCommand(executeContext);
+        Host host = executeContext.getHost();
+        boolean success = false;
+        //
+        while (!success) {
+            try {
+                doExecute(host, command);
+                success = true;
+                executeContext.setHost(host);
+            } catch (ExecuteException ex) {
+                logger.error(String.format("execute context : %s error", 
executeContext.getContext()), ex);
+                try {
+                    failNodeSet.add(host.getAddress());
+                    Set<String> tmpAllIps = new HashSet<>(allNodes);
+                    Collection<String> remained = 
CollectionUtils.subtract(tmpAllIps, failNodeSet);
+                    if (remained != null && remained.size() > 0) {
+                        host = Host.of(remained.iterator().next());
+                        logger.error("retry execute context : {} host : {}", 
executeContext.getContext(), host);
+                    } else {
+                        throw new ExecuteException("fail after try all nodes");
+                    }
+                } catch (Throwable t) {
+                    throw new ExecuteException("fail after try all nodes");
+                }
+            }
+        }
+    }
+
+    private Command buildCommand(ExecutionContext context) {
+        ExecuteTaskRequestCommand requestCommand = new 
ExecuteTaskRequestCommand();
+        ExecutorType executorType = context.getExecutorType();
+        switch (executorType){
+            case WORKER:
+                TaskExecutionContext taskExecutionContext = 
(TaskExecutionContext)context.getContext();
+                
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
+                break;
+            case CLIENT:
+                break;
+            default:
+                throw new IllegalArgumentException("invalid executor type : " 
+ executorType);
+
+        }
+        return requestCommand.convert2Command();
+    }
+
+    private void doExecute(final Host host, final Command command) throws 
ExecuteException {
+        int retryCount = 3;
+        boolean success = false;
+        do {
+            try {
+                nettyRemotingClient.send(host, command);
+                success = true;
+            } catch (Exception ex) {
+                logger.error(String.format("send command : %s to %s error", 
command, host), ex);
+                retryCount--;
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ignore) {}
+            }
+        } while (retryCount >= 0 && !success);
+
+        if (!success) {
+            throw new ExecuteException(String.format("send command : %s to %s 
error", command, host));
+        }
+    }
+
+    private Set<String> getAllNodes(ExecutionContext context){
+        Set<String> nodes = Collections.EMPTY_SET;
+        ExecutorType executorType = context.getExecutorType();
+        switch (executorType){
+            case WORKER:
+                nodes = zookeeperNodeManager.getWorkerNodes();
+                break;
+            case CLIENT:
+                break;
+             default:
+                throw new IllegalArgumentException("invalid executor type : " 
+ executorType);
+
+        }
+        return nodes;
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
similarity index 77%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
index 316ce36..8708273 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.host;
 
 
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 
 public interface HostManager {
 
-    Host select(TaskExecutionContext context);
+    Host select(ExecutionContext context);
 
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
similarity index 62%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index 18a4659..1c222b8 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -15,11 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.host;
 
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,20 +40,35 @@ public class RoundRobinHostManager implements HostManager {
     private final Logger logger = 
LoggerFactory.getLogger(RoundRobinHostManager.class);
 
     @Autowired
-    private RoundRobinSelector<Host> selector;
-
-    @Autowired
     private ZookeeperNodeManager zookeeperNodeManager;
 
+    private final Selector<Host> selector;
+
+    public RoundRobinHostManager(){
+        this.selector = new RoundRobinSelector<>();
+    }
+
     @Override
-    public Host select(TaskExecutionContext context){
+    public Host select(ExecutionContext context){
         Host host = new Host();
-        Collection<String> nodes = zookeeperNodeManager.getWorkerNodes();
+        Collection<String> nodes = null;
+        ExecutorType executorType = context.getExecutorType();
+        switch (executorType){
+            case WORKER:
+                nodes = zookeeperNodeManager.getWorkerNodes();
+                break;
+            case CLIENT:
+                break;
+            default:
+                throw new IllegalArgumentException("invalid executorType : " + 
executorType);
+
+        }
         if(CollectionUtils.isEmpty(nodes)){
             return host;
         }
         List<Host> candidateHosts = new ArrayList<>(nodes.size());
         nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
+
         return selector.select(candidateHosts);
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
similarity index 95%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
index 3a3f123..cf8c0e8 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
 import java.util.Collection;
 import java.util.Random;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
similarity index 91%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
index d342296..90319de 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
@@ -14,12 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import org.springframework.stereotype.Service;
 
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
-
+@Service
 public class RoundRobinSelector<T> implements Selector<T> {
 
     private final AtomicInteger index = new AtomicInteger(0);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
similarity index 92%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
index c6772f3..bd7c4ac 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
 import java.util.Collection;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
similarity index 59%
copy from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
copy to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index c3b6a05..1103b23 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -22,45 +22,40 @@ import 
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.server.master.future.TaskFuture;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *  task response processor
+ *  task ack processor
  */
-public class TaskResponseProcessor implements NettyRequestProcessor {
+public class TaskAckProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = 
LoggerFactory.getLogger(TaskResponseProcessor.class);
+    private final Logger logger = 
LoggerFactory.getLogger(TaskAckProcessor.class);
 
     /**
      * process service
      */
     private final ProcessService processService;
 
-    public TaskResponseProcessor(ProcessService processService){
+    public TaskAckProcessor(ProcessService processService){
         this.processService = processService;
     }
 
-    /**
-     * task final result response
-     * need master process , state persistence
-     *
-     * @param channel channel
-     * @param command command
-     */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
-        logger.info("received command : {}", command);
-        ExecuteTaskResponseCommand responseCommand = 
FastJsonSerializer.deserialize(command.getBody(), 
ExecuteTaskResponseCommand.class);
-        
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), 
responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
-        TaskFuture.notify(command);
+        Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
+        ExecuteTaskAckCommand taskAckCommand = 
FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
+        logger.info("taskAckCommand : {}",taskAckCommand);
+        
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
+                taskAckCommand.getStartTime(),
+                taskAckCommand.getHost(),
+                taskAckCommand.getExecutePath(),
+                taskAckCommand.getLogPath(),
+                taskAckCommand.getTaskInstanceId());
     }
 
-
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index c3b6a05..b62bb77 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.server.master.future.TaskFuture;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,7 +58,6 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
         logger.info("received command : {}", command);
         ExecuteTaskResponseCommand responseCommand = 
FastJsonSerializer.deserialize(command.getBody(), 
ExecuteTaskResponseCommand.class);
         
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), 
responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
-        TaskFuture.notify(command);
     }
 
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 7106cc6..f675493 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -32,16 +32,23 @@ import 
org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
 import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.future.InvokeCallback;
+import org.apache.dolphinscheduler.remote.future.ResponseFuture;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.concurrent.Callable;
 
@@ -92,9 +99,9 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
 
 
     /**
-     *  netty remoting client
+     * executor dispatcher
      */
-    private static final NettyRemotingClient nettyRemotingClient = new 
NettyRemotingClient(new NettyClientConfig());
+    private ExecutorDispatcher dispatcher;
 
     /**
      * constructor of MasterBaseTaskExecThread
@@ -102,13 +109,14 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
      * @param processInstance   process instance
      */
     public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance 
processInstance){
-        this.processService = BeanContext.getBean(ProcessService.class);
-        this.alertDao = BeanContext.getBean(AlertDao.class);
+        this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
+        this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
         this.processInstance = processInstance;
         this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
         this.cancel = false;
         this.taskInstance = taskInstance;
         this.masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
+        this.dispatcher = 
SpringApplicationContext.getBean(ExecutorDispatcher.class);
     }
 
     /**
@@ -126,30 +134,17 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
         this.cancel = true;
     }
 
-
-    // TODO send task to worker
-    public void sendToWorker(TaskInstance taskInstance){
-        final Address address = new Address("127.0.0.1", 12346);
-
-        ExecuteTaskRequestCommand taskRequestCommand = new 
ExecuteTaskRequestCommand(
-                
FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
+    /**
+     * dispatch task to worker
+     * @param taskInstance
+     */
+    public void dispatch(TaskInstance taskInstance){
+        TaskExecutionContext context = getTaskExecutionContext(taskInstance);
+        ExecutionContext executionContext = new ExecutionContext(context, 
ExecutorType.WORKER);
         try {
-            Command responseCommand = nettyRemotingClient.sendSync(address,
-                    taskRequestCommand.convert2Command(), 2000);
-
-            ExecuteTaskAckCommand taskAckCommand = 
FastJsonSerializer.deserialize(
-                    responseCommand.getBody(), ExecuteTaskAckCommand.class);
-
-            logger.info("taskAckCommand : {}",taskAckCommand);
-            
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
-                    taskAckCommand.getStartTime(),
-                    taskAckCommand.getHost(),
-                    taskAckCommand.getExecutePath(),
-                    taskAckCommand.getLogPath(),
-                    taskInstance.getId());
-
-        } catch (InterruptedException | RemotingException ex) {
-            logger.error(String.format("send command to : %s error", address), 
ex);
+            dispatcher.dispatch(executionContext);
+        } catch (ExecuteException e) {
+            logger.error("execute exception", e);
         }
     }
 
@@ -239,7 +234,7 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
                 }
                 if(submitDB && !submitQueue){
                     // submit task to queue
-                    sendToWorker(task);
+                    dispatch(task);
                     submitQueue = true;
                 }
                 if(submitDB && submitQueue){
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index b8bf1c9..d0f4927 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -33,10 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.utils.Address;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.AlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index e3eacaf..c7a2d0b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -35,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 
 @Service
-public abstract class ZookeeperNodeManager implements InitializingBean {
+public class ZookeeperNodeManager implements InitializingBean {
 
     private final Logger logger = 
LoggerFactory.getLogger(ZookeeperNodeManager.class);
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 96b8424..3364a94 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -47,7 +47,7 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
 
     @Override
     public void afterPropertiesSet() throws Exception {
-
+        init();
     }
 
     public void init() {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index cd62e98..632d2f7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -76,7 +76,7 @@ public class TaskCallbackService {
      */
     public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
         CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
-        
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
+        
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command());
     }
 
     /**
@@ -87,8 +87,7 @@ public class TaskCallbackService {
      */
     public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand 
responseCommand){
         CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
-        
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(
-                callbackChannel.getOpaque())).addListener(new 
ChannelFutureListener(){
+        
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new
 ChannelFutureListener(){
 
             @Override
             public void operationComplete(ChannelFuture future) throws 
Exception {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
index 038b8ef..39dc136 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
@@ -79,9 +79,9 @@ public class WorkerRequestProcessor implements 
NettyRequestProcessor {
         ExecuteTaskRequestCommand taskRequestCommand = 
FastJsonSerializer.deserialize(
                 command.getBody(), ExecuteTaskRequestCommand.class);
 
-        String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
+        String contextJson = taskRequestCommand.getTaskExecutionContext();
 
-        TaskExecutionContext taskExecutionContext = 
JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class);
+        TaskExecutionContext taskExecutionContext = 
JSONObject.parseObject(contextJson, TaskExecutionContext.class);
 
         // local execute path
         String execLocalPath = getExecLocalPath(taskExecutionContext);
@@ -92,7 +92,7 @@ public class WorkerRequestProcessor implements 
NettyRequestProcessor {
         } catch (Exception ex){
             logger.error(String.format("create execLocalPath : %s", 
execLocalPath), ex);
         }
-        
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(),
+        
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(),
                 new CallbackChannel(channel, command.getOpaque()));
 
         // submit task
@@ -110,6 +110,6 @@ public class WorkerRequestProcessor implements 
NettyRequestProcessor {
         return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
                 taskExecutionContext.getProcessDefineId(),
                 taskExecutionContext.getProcessInstanceId(),
-                taskExecutionContext.getTaskId());
+                taskExecutionContext.getTaskInstanceId());
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index c54842b..b288aea 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -93,12 +93,12 @@ public class TaskScheduleThread implements Runnable {
     @Override
     public void run() {
 
-        ExecuteTaskResponseCommand responseCommand = new 
ExecuteTaskResponseCommand(taskExecutionContext.getTaskId());
+        ExecuteTaskResponseCommand responseCommand = new 
ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
 
         try {
             // tell master that task is in executing
             ExecuteTaskAckCommand ackCommand = 
buildAckCommand(taskExecutionContext.getTaskType());
-            
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(), 
ackCommand);
+            
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), 
ackCommand);
 
             logger.info("script path : {}", 
taskExecutionContext.getExecutePath());
             // task node
@@ -118,7 +118,7 @@ public class TaskScheduleThread implements Runnable {
                     taskExecutionContext.getScheduleTime(),
                     taskExecutionContext.getTaskName(),
                     taskExecutionContext.getTaskType(),
-                    taskExecutionContext.getTaskId(),
+                    taskExecutionContext.getTaskInstanceId(),
                     CommonUtils.getSystemEnvPath(),
                     taskExecutionContext.getTenantCode(),
                     taskExecutionContext.getQueue(),
@@ -132,13 +132,13 @@ public class TaskScheduleThread implements Runnable {
             taskProps.setTaskAppId(String.format("%s_%s_%s",
                     taskExecutionContext.getProcessDefineId(),
                     taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskId()));
+                    taskExecutionContext.getTaskInstanceId()));
 
             // custom logger
             Logger taskLogger = 
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                     taskExecutionContext.getProcessDefineId(),
                     taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskId()));
+                    taskExecutionContext.getTaskInstanceId()));
 
             task = TaskManager.newTask(taskExecutionContext.getTaskType(),
                     taskProps,
@@ -156,14 +156,14 @@ public class TaskScheduleThread implements Runnable {
             //
             responseCommand.setStatus(task.getExitStatus().getCode());
             responseCommand.setEndTime(new Date());
-            logger.info("task instance id : {},task final status : {}", 
taskExecutionContext.getTaskId(), task.getExitStatus());
+            logger.info("task instance id : {},task final status : {}", 
taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
         }catch (Exception e){
             logger.error("task scheduler failure", e);
             kill();
             responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
             responseCommand.setEndTime(new Date());
         } finally {
-            
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(), 
responseCommand);
+            
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(),
 responseCommand);
         }
     }
 
@@ -213,13 +213,13 @@ public class TaskScheduleThread implements Runnable {
             return baseLog + Constants.SINGLE_SLASH +
                     taskExecutionContext.getProcessDefineId() + 
Constants.SINGLE_SLASH  +
                     taskExecutionContext.getProcessInstanceId() + 
Constants.SINGLE_SLASH  +
-                    taskExecutionContext.getTaskId() + ".log";
+                    taskExecutionContext.getTaskInstanceId() + ".log";
         }
         return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
                 baseLog +  Constants.SINGLE_SLASH +
                 taskExecutionContext.getProcessDefineId() + 
Constants.SINGLE_SLASH  +
                 taskExecutionContext.getProcessInstanceId() + 
Constants.SINGLE_SLASH  +
-                taskExecutionContext.getTaskId() + ".log";
+                taskExecutionContext.getTaskInstanceId() + ".log";
     }
 
     /**
@@ -325,9 +325,9 @@ public class TaskScheduleThread implements Runnable {
      * @throws Exception exception
      */
     private void checkDownloadPermission(List<String> projectRes) throws 
Exception {
-        int userId = taskExecutionContext.getExecutorId();
+        int executorId = taskExecutionContext.getExecutorId();
         String[] resNames = projectRes.toArray(new String[projectRes.size()]);
-        PermissionCheck<String> permissionCheck = new 
PermissionCheck<>(AuthorizationType.RESOURCE_FILE, 
processService,resNames,userId,logger);
+        PermissionCheck<String> permissionCheck = new 
PermissionCheck<>(AuthorizationType.RESOURCE_FILE, 
processService,resNames,executorId,logger);
         permissionCheck.checkPermission();
     }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index 5daf535..c979eb2 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -16,12 +16,11 @@
  */
 package org.apache.dolphinscheduler.service.log;
 
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.log.*;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,7 +72,7 @@ public class LogClientService {
         logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum 
{} ,limit {}", host, port, path, skipLineNum, limit);
         RollViewLogRequestCommand request = new 
RollViewLogRequestCommand(path, skipLineNum, limit);
         String result = "";
-        final Address address = new Address(host, port);
+        final Host address = new Host(host, port);
         try {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, 
logRequestTimeout);
@@ -101,7 +100,7 @@ public class LogClientService {
         logger.info("view log path {}", path);
         ViewLogRequestCommand request = new ViewLogRequestCommand(path);
         String result = "";
-        final Address address = new Address(host, port);
+        final Host address = new Host(host, port);
         try {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, 
logRequestTimeout);
@@ -129,7 +128,7 @@ public class LogClientService {
         logger.info("log path {}", path);
         GetLogBytesRequestCommand request = new 
GetLogBytesRequestCommand(path);
         byte[] result = null;
-        final Address address = new Address(host, port);
+        final Host address = new Host(host, port);
         try {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, 
logRequestTimeout);

Reply via email to