This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 9dc9745  Refactor worker (#2018)
9dc9745 is described below

commit 9dc9745a80957c56b0dbba4a218c6e3b0cff77ac
Author: Tboy <[email protected]>
AuthorDate: Wed Feb 26 09:57:21 2020 +0800

    Refactor worker (#2018)
    
    * Refactor worker (#7)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * Refactor worker (#8)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * add kill command
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../remote/command/CommandType.java                |   2 +-
 .../remote/command/KillTaskRequestCommand.java     |   1 +
 .../server/worker/WorkerServer.java                |   6 +-
 ...allbackChannel.java => NettyRemoteChannel.java} |  39 ++++++--
 .../worker/processor/TaskCallbackService.java      |  84 ++++++++++-------
 ...estProcessor.java => TaskExecuteProcessor.java} |  71 ++++++++++++--
 .../server/worker/processor/TaskKillProcessor.java | 103 +++++++++++++++++++++
 .../server/worker/runner/FetchTaskThread.java      |   2 +-
 ...kScheduleThread.java => TaskExecuteThread.java} |  71 ++------------
 9 files changed, 264 insertions(+), 115 deletions(-)

diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 79ef2d9..053b38f 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
  
    */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    EXECUTE_TASK_REQUEST,


    /**
     * execute task ack
     */
    EXECUTE_TASK_ACK,
    /**
     * execute task response
     */
    EXECUTE_TASK_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
  
    */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    EXECUTE_TASK_REQUEST,

    /**
     * execute task ack
     */
    EXECUTE_TASK_ACK,

    /**
     * execute task response
     */
    EXECUTE_TASK_RESPONSE,

    /**
     * kill task
     */
    KILL_TASK_REQUEST,

    /**
     * kill task response
     */
    KILL_TASK_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
new file mode 100644
index 0000000..3ece650
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
@@ -0,0 +1 @@
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public
  class KillTaskRequestCommand implements Serializable {

    private int taskInstanceId;

    private int processId;

    private String host;

    private String tenantCode;

    private String logPath;

    private String executePath;

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = logPath;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getProcessId() {
        return processId;
    }

    public void setProcessId(int processId) {
        this.processId = processId;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getTenantCode() {
        return tenantCode;
    }

    public void setTenantCode(String tenantCode) {
        this.tenantCod
 e = tenantCode;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.KILL_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }


}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 2625d68..c2af7b1 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -35,7 +35,8 @@ import 
org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import 
org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor;
+import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -168,7 +169,8 @@ public class WorkerServer implements IStoppable {
         //init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, 
new WorkerRequestProcessor(processService));
+        
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, 
new TaskExecuteProcessor(processService));
+        
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new 
TaskKillProcessor());
         this.nettyRemotingServer.start();
 
         this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, 
serverConfig.getListenPort());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
similarity index 60%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
index e3d893f..cbb8972 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
@@ -18,24 +18,35 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
 
 /**
  *  callback channel
  */
-public class CallbackChannel {
+public class NettyRemoteChannel {
 
     /**
      *  channel
      */
-    private Channel channel;
+    private final Channel channel;
 
     /**
      *  equest unique identification
      */
-    private long opaque;
+    private final long opaque;
 
-    public CallbackChannel(Channel channel, long opaque) {
+    /**
+     * master host
+     */
+    private final Host host;
+
+
+    public NettyRemoteChannel(Channel channel, long opaque) {
         this.channel = channel;
+        this.host = ChannelUtils.toAddress(channel);
         this.opaque = opaque;
     }
 
@@ -43,15 +54,23 @@ public class CallbackChannel {
         return channel;
     }
 
-    public void setChannel(Channel channel) {
-        this.channel = channel;
-    }
-
     public long getOpaque() {
         return opaque;
     }
 
-    public void setOpaque(long opaque) {
-        this.opaque = opaque;
+    public Host getHost() {
+        return host;
+    }
+
+    public boolean isActive(){
+        return this.channel.isActive();
+    }
+
+    public ChannelFuture writeAndFlush(Command command){
+        return this.channel.writeAndFlush(command);
+    }
+
+    public void close(){
+        this.channel.close();
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 632d2f7..23ac7e2 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -21,8 +21,12 @@ package org.apache.dolphinscheduler.server.worker.processor;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -31,18 +35,31 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class TaskCallbackService {
 
+    private final Logger logger = 
LoggerFactory.getLogger(TaskCallbackService.class);
+
+    /**
+     *  remote channels
+     */
+    private static final ConcurrentHashMap<Integer, NettyRemoteChannel> 
REMOTE_CHANNELS = new ConcurrentHashMap<>();
+
     /**
-     *  callback channels
+     * netty remoting client
      */
-    private static final ConcurrentHashMap<Integer, CallbackChannel> 
CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
+    private final NettyRemotingClient nettyRemotingClient;
+
+
+    public TaskCallbackService(){
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+    }
 
     /**
      *  add callback channel
      * @param taskInstanceId taskInstanceId
      * @param channel  channel
      */
-    public void addCallbackChannel(int taskInstanceId, CallbackChannel 
channel){
-        CALL_BACK_CHANNELS.put(taskInstanceId, channel);
+    public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel 
channel){
+        REMOTE_CHANNELS.put(taskInstanceId, channel);
     }
 
     /**
@@ -50,15 +67,18 @@ public class TaskCallbackService {
      * @param taskInstanceId taskInstanceId
      * @return callback channel
      */
-    public CallbackChannel getCallbackChannel(int taskInstanceId){
-        CallbackChannel callbackChannel = 
CALL_BACK_CHANNELS.get(taskInstanceId);
-        if(callbackChannel.getChannel().isActive()){
-            return callbackChannel;
+    public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
+        NettyRemoteChannel nettyRemoteChannel = 
REMOTE_CHANNELS.get(taskInstanceId);
+        if(nettyRemoteChannel.isActive()){
+            return nettyRemoteChannel;
+        }
+        Channel newChannel = 
nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
+        if(newChannel != null){
+            NettyRemoteChannel remoteChannel = new 
NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
+            addRemoteChannel(taskInstanceId, remoteChannel);
+            return remoteChannel;
         }
-        Channel newChannel = createChannel();
-        callbackChannel.setChannel(newChannel);
-        CALL_BACK_CHANNELS.put(taskInstanceId, callbackChannel);
-        return callbackChannel;
+        return null;
     }
 
     /**
@@ -66,7 +86,7 @@ public class TaskCallbackService {
      * @param taskInstanceId taskInstanceId
      */
     public void remove(int taskInstanceId){
-        CALL_BACK_CHANNELS.remove(taskInstanceId);
+        REMOTE_CHANNELS.remove(taskInstanceId);
     }
 
     /**
@@ -75,8 +95,12 @@ public class TaskCallbackService {
      * @param ackCommand ackCommand
      */
     public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
-        CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
-        
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command());
+        NettyRemoteChannel nettyRemoteChannel = 
getRemoteChannel(taskInstanceId);
+        if(nettyRemoteChannel == null){
+            //TODO
+        } else{
+            nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command());
+        }
     }
 
     /**
@@ -86,22 +110,20 @@ public class TaskCallbackService {
      * @param responseCommand responseCommand
      */
     public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand 
responseCommand){
-        CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
-        
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new
 ChannelFutureListener(){
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws 
Exception {
-                if(future.isSuccess()){
-                    remove(taskInstanceId);
-                    return;
-                }
-            }
-        });
-    }
+        NettyRemoteChannel nettyRemoteChannel = 
getRemoteChannel(taskInstanceId);
+        if(nettyRemoteChannel == null){
+            //TODO
+        } else{
+            
nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new
 ChannelFutureListener(){
 
-    // TODO
-    private Channel createChannel(){
-        return null;
+                @Override
+                public void operationComplete(ChannelFuture future) throws 
Exception {
+                    if(future.isSuccess()){
+                        remove(taskInstanceId);
+                        return;
+                    }
+                }
+            });
+        }
     }
-
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
similarity index 55%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 39dc136..818e223 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -17,32 +17,41 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.sift.SiftingAppender;
 import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
 import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Date;
 import java.util.concurrent.ExecutorService;
 
 /**
  *  worker request processor
  */
-public class WorkerRequestProcessor implements NettyRequestProcessor {
+public class TaskExecuteProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = 
LoggerFactory.getLogger(WorkerRequestProcessor.class);
+    private final Logger logger = 
LoggerFactory.getLogger(TaskExecuteProcessor.class);
 
     /**
      * process service
@@ -64,7 +73,7 @@ public class WorkerRequestProcessor implements 
NettyRequestProcessor {
      */
     private final TaskCallbackService taskCallbackService;
 
-    public WorkerRequestProcessor(ProcessService processService){
+    public TaskExecuteProcessor(ProcessService processService){
         this.processService = processService;
         this.taskCallbackService = new TaskCallbackService();
         this.workerConfig = 
SpringApplicationContext.getBean(WorkerConfig.class);
@@ -92,14 +101,62 @@ public class WorkerRequestProcessor implements 
NettyRequestProcessor {
         } catch (Exception ex){
             logger.error(String.format("create execLocalPath : %s", 
execLocalPath), ex);
         }
-        
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(),
-                new CallbackChannel(channel, command.getOpaque()));
+        
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+                new NettyRemoteChannel(channel, command.getOpaque()));
 
+        this.doAck(taskExecutionContext);
         // submit task
-        workerExecService.submit(new TaskScheduleThread(taskExecutionContext,
+        workerExecService.submit(new TaskExecuteThread(taskExecutionContext,
                 processService, taskCallbackService));
     }
 
+    private void doAck(TaskExecutionContext taskExecutionContext){
+        // tell master that task is in executing
+        ExecuteTaskAckCommand ackCommand = 
buildAckCommand(taskExecutionContext);
+        taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), 
ackCommand);
+    }
+
+    /**
+     * get task log path
+     * @return log path
+     */
+    private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
+        String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) 
((LoggerContext) LoggerFactory.getILoggerFactory())
+                .getLogger("ROOT")
+                .getAppender("TASKLOGFILE"))
+                .getDiscriminator()).getLogBase();
+        if (baseLog.startsWith(Constants.SINGLE_SLASH)){
+            return baseLog + Constants.SINGLE_SLASH +
+                    taskExecutionContext.getProcessDefineId() + 
Constants.SINGLE_SLASH  +
+                    taskExecutionContext.getProcessInstanceId() + 
Constants.SINGLE_SLASH  +
+                    taskExecutionContext.getTaskInstanceId() + ".log";
+        }
+        return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
+                baseLog +  Constants.SINGLE_SLASH +
+                taskExecutionContext.getProcessDefineId() + 
Constants.SINGLE_SLASH  +
+                taskExecutionContext.getProcessInstanceId() + 
Constants.SINGLE_SLASH  +
+                taskExecutionContext.getTaskInstanceId() + ".log";
+    }
+
+    /**
+     * build ack command
+     * @param taskExecutionContext taskExecutionContext
+     * @return ExecuteTaskAckCommand
+     */
+    private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext 
taskExecutionContext) {
+        ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
+        ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
+        ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
+        ackCommand.setHost(OSUtils.getHost());
+        ackCommand.setStartTime(new Date());
+        if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || 
taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
+            ackCommand.setExecutePath(null);
+        }else{
+            ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
+        }
+        return ackCommand;
+    }
 
     /**
      * get execute local path
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
new file mode 100644
index 0000000..2985374
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ *  task kill processor
+ */
+public class TaskKillProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = 
LoggerFactory.getLogger(TaskKillProcessor.class);
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
+        logger.info("received command : {}", command);
+        KillTaskRequestCommand killCommand = 
FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
+        doKill(killCommand);
+    }
+
+
+    private void doKill(KillTaskRequestCommand killCommand){
+        try {
+            if(killCommand.getProcessId() == 0 ){
+                logger.error("process kill failed, process id :{}, task 
id:{}", killCommand.getProcessId(), killCommand.getTaskInstanceId());
+                return;
+            }
+            String cmd = String.format("sudo kill -9 %s", 
ProcessUtils.getPidsStr(killCommand.getProcessId()));
+
+            logger.info("process id:{}, cmd:{}", killCommand.getProcessId(), 
cmd);
+
+            OSUtils.exeCmd(cmd);
+
+            // find log and kill yarn job
+            killYarnJob(killCommand.getHost(), killCommand.getLogPath(), 
killCommand.getExecutePath(), killCommand.getTenantCode());
+
+        } catch (Exception e) {
+            logger.error("kill task failed", e);
+        }
+    }
+
+    public void killYarnJob(String host, String logPath, String executePath, 
String tenantCode) {
+        try {
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+            LogClientService logClient = null;
+            String log = null;
+            try {
+                logClient = new LogClientService();
+                log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
+            } finally {
+                if(logClient != null){
+                    logClient.close();
+                }
+            }
+            if (StringUtils.isNotEmpty(log)) {
+                List<String> appIds = LoggerUtils.getAppIds(log, logger);
+                if (StringUtils.isEmpty(executePath)) {
+                    logger.error("task instance work dir is empty");
+                    throw new RuntimeException("task instance work dir is 
empty");
+                }
+                if (appIds.size() > 0) {
+                    ProcessUtils.cancelApplication(appIds, logger, tenantCode, 
executePath);
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error("kill yarn job failure",e);
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
index 9e0c452..0498848 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
@@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{
 
                     logger.info("task : {} ready to submit to task scheduler 
thread",taskInstId);
                     // submit task
-//                    workerExecService.submit(new 
TaskScheduleThread(taskInstance, processService));
+//                    workerExecService.submit(new 
TaskExecuteThread(taskInstance, processService));
 
                     // remove node from zk
                     removeNodeFromTaskQueue(taskQueueStr);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
similarity index 76%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 735e4ba..be89401 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,21 +17,15 @@
 package org.apache.dolphinscheduler.server.worker.runner;
 
 
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.sift.SiftingAppender;
 import com.alibaba.fastjson.JSONObject;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
 import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
@@ -51,12 +45,12 @@ import java.util.stream.Collectors;
 /**
  *  task scheduler thread
  */
-public class TaskScheduleThread implements Runnable {
+public class TaskExecuteThread implements Runnable {
 
     /**
      * logger
      */
-    private final Logger logger = 
LoggerFactory.getLogger(TaskScheduleThread.class);
+    private final Logger logger = 
LoggerFactory.getLogger(TaskExecuteThread.class);
 
     /**
      *  task instance
@@ -74,32 +68,27 @@ public class TaskScheduleThread implements Runnable {
     private AbstractTask task;
 
     /**
-     *  task instance callback service
+     *  task callback service
      */
-    private TaskCallbackService taskInstanceCallbackService;
+    private TaskCallbackService taskCallbackService;
 
     /**
      *  constructor
      * @param taskExecutionContext taskExecutionContext
      * @param processService processService
-     * @param taskInstanceCallbackService taskInstanceCallbackService
+     * @param taskCallbackService taskCallbackService
      */
-    public TaskScheduleThread(TaskExecutionContext taskExecutionContext, 
ProcessService processService, TaskCallbackService taskInstanceCallbackService){
+    public TaskExecuteThread(TaskExecutionContext taskExecutionContext, 
ProcessService processService, TaskCallbackService taskCallbackService){
         this.processService = processService;
         this.taskExecutionContext = taskExecutionContext;
-        this.taskInstanceCallbackService = taskInstanceCallbackService;
+        this.taskCallbackService = taskCallbackService;
     }
 
     @Override
     public void run() {
 
         ExecuteTaskResponseCommand responseCommand = new 
ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
-
         try {
-            // tell master that task is in executing
-            ExecuteTaskAckCommand ackCommand = 
buildAckCommand(taskExecutionContext);
-            
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), 
ackCommand);
-
             logger.info("script path : {}", 
taskExecutionContext.getExecutePath());
             // task node
             TaskNode taskNode = 
JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
@@ -163,7 +152,7 @@ public class TaskScheduleThread implements Runnable {
             responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
             responseCommand.setEndTime(new Date());
         } finally {
-            
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(),
 responseCommand);
+            
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), 
responseCommand);
         }
     }
 
@@ -184,48 +173,6 @@ public class TaskScheduleThread implements Runnable {
     }
 
     /**
-     * build ack command
-     * @param taskExecutionContext taskExecutionContext
-     * @return ExecuteTaskAckCommand
-     */
-    private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext 
taskExecutionContext) {
-        ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
-        ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
-        ackCommand.setLogPath(getTaskLogPath());
-        ackCommand.setHost(OSUtils.getHost());
-        ackCommand.setStartTime(new Date());
-        if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || 
taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
-            ackCommand.setExecutePath(null);
-        }else{
-            ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
-        }
-        return ackCommand;
-    }
-
-    /**
-     * get task log path
-     * @return log path
-     */
-    private String getTaskLogPath() {
-        String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) 
((LoggerContext) LoggerFactory.getILoggerFactory())
-                .getLogger("ROOT")
-                .getAppender("TASKLOGFILE"))
-                .getDiscriminator()).getLogBase();
-        if (baseLog.startsWith(Constants.SINGLE_SLASH)){
-            return baseLog + Constants.SINGLE_SLASH +
-                    taskExecutionContext.getProcessDefineId() + 
Constants.SINGLE_SLASH  +
-                    taskExecutionContext.getProcessInstanceId() + 
Constants.SINGLE_SLASH  +
-                    taskExecutionContext.getTaskInstanceId() + ".log";
-        }
-        return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
-                baseLog +  Constants.SINGLE_SLASH +
-                taskExecutionContext.getProcessDefineId() + 
Constants.SINGLE_SLASH  +
-                taskExecutionContext.getProcessInstanceId() + 
Constants.SINGLE_SLASH  +
-                taskExecutionContext.getTaskInstanceId() + ".log";
-    }
-
-    /**
      * set task timeout
      * @param taskProps
      * @param taskNode
@@ -259,8 +206,6 @@ public class TaskScheduleThread implements Runnable {
     }
 
 
-
-
     /**
      *  kill task
      */

Reply via email to