This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new b52542f  master kill task refactor (#2069)
b52542f is described below

commit b52542f250f2339bf100835471b9074d2739d4ab
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue Mar 3 17:58:22 2020 +0800

    master kill task refactor (#2069)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
---
 .../server/master/MasterServer.java                |  2 +-
 .../dispatch/executor/NettyExecutorManager.java    | 28 ++++++++++++++++++++--
 .../processor/TaskKillResponseProcessor.java       |  3 +--
 .../server/master/runner/MasterTaskExecThread.java |  2 +-
 4 files changed, 29 insertions(+), 6 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 9c46ad6..6923cd0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -128,7 +128,7 @@ public class MasterServer implements IStoppable {
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new 
TaskAckProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskKillResponseProcessor());
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
         this.nettyRemotingServer.start();
 
         //
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 286b0e6..7719cf8 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.remote.utils.Host;
@@ -71,7 +72,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
          */
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new 
TaskAckProcessor());
-        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskKillResponseProcessor());
+        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
     }
 
 
@@ -130,8 +131,9 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
         return success;
     }
 
+    @Override
     public void executeDirectly(ExecutionContext context) throws 
ExecuteException {
-        Command command = buildCommand(context);
+        Command command = buildKillCommand(context);
         Host host = context.getHost();
         doExecute(host,command);
     }
@@ -159,6 +161,28 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
     }
 
     /**
+     *  build command
+     * @param context context
+     * @return command
+     */
+    private Command buildKillCommand(ExecutionContext context) {
+        TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
+        ExecutorType executorType = context.getExecutorType();
+        switch (executorType){
+            case WORKER:
+                TaskExecutionContext taskExecutionContext = 
context.getContext();
+                
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
+                break;
+            case CLIENT:
+                break;
+            default:
+                throw new IllegalArgumentException("invalid executor type : " 
+ executorType);
+
+        }
+        return requestCommand.convert2Command();
+    }
+
+    /**
      *  execute logic
      * @param host host
      * @param command command
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 4986e89..3e8cdfd 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -46,8 +46,7 @@ public class TaskKillResponseProcessor implements 
NettyRequestProcessor {
         Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
 
         TaskKillResponseCommand responseCommand = 
FastJsonSerializer.deserialize(command.getBody(), 
TaskKillResponseCommand.class);
-        logger.info("received command : {}", responseCommand);
-        logger.info("已经接受到了worker杀任务的回应");
+        logger.info("received task kill response command : {}", 
responseCommand);
     }
 
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 995a4e8..51a4f44 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -26,6 +26,7 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@@ -183,7 +184,6 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
         alreadyKilled = true;
 
         TaskExecutionContext taskExecutionContext = 
super.getTaskExecutionContext(taskInstance);
-
         ExecutionContext executionContext = new 
ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
 
         Host host = new Host();

Reply via email to