This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new b52542f master kill task refactor (#2069)
b52542f is described below
commit b52542f250f2339bf100835471b9074d2739d4ab
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue Mar 3 17:58:22 2020 +0800
master kill task refactor (#2069)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
---
.../server/master/MasterServer.java | 2 +-
.../dispatch/executor/NettyExecutorManager.java | 28 ++++++++++++++++++++--
.../processor/TaskKillResponseProcessor.java | 3 +--
.../server/master/runner/MasterTaskExecThread.java | 2 +-
4 files changed, 29 insertions(+), 6 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 9c46ad6..6923cd0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -128,7 +128,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new
TaskAckProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskKillResponseProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
this.nettyRemotingServer.start();
//
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 286b0e6..7719cf8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
@@ -71,7 +72,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
*/
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new
TaskAckProcessor());
-
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskKillResponseProcessor());
+
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
}
@@ -130,8 +131,9 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
return success;
}
+ @Override
public void executeDirectly(ExecutionContext context) throws
ExecuteException {
- Command command = buildCommand(context);
+ Command command = buildKillCommand(context);
Host host = context.getHost();
doExecute(host,command);
}
@@ -159,6 +161,28 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
}
/**
+ * build command
+ * @param context context
+ * @return command
+ */
+ private Command buildKillCommand(ExecutionContext context) {
+ TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
+ ExecutorType executorType = context.getExecutorType();
+ switch (executorType){
+ case WORKER:
+ TaskExecutionContext taskExecutionContext =
context.getContext();
+
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
+ break;
+ case CLIENT:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid executor type : "
+ executorType);
+
+ }
+ return requestCommand.convert2Command();
+ }
+
+ /**
* execute logic
* @param host host
* @param command command
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 4986e89..3e8cdfd 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -46,8 +46,7 @@ public class TaskKillResponseProcessor implements
NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
TaskKillResponseCommand responseCommand =
FastJsonSerializer.deserialize(command.getBody(),
TaskKillResponseCommand.class);
- logger.info("received command : {}", responseCommand);
- logger.info("已经接受到了worker杀任务的回应");
+ logger.info("received task kill response command : {}",
responseCommand);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 995a4e8..51a4f44 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -26,6 +26,7 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@@ -183,7 +184,6 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
alreadyKilled = true;
TaskExecutionContext taskExecutionContext =
super.getTaskExecutionContext(taskInstance);
-
ExecutionContext executionContext = new
ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
Host host = new Host();