This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 9dc9745 Refactor worker (#2018)
9dc9745 is described below
commit 9dc9745a80957c56b0dbba4a218c6e3b0cff77ac
Author: Tboy <[email protected]>
AuthorDate: Wed Feb 26 09:57:21 2020 +0800
Refactor worker (#2018)
* Refactor worker (#7)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <[email protected]>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
Co-authored-by: qiaozhanwei <[email protected]>
* Refactor worker (#8)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <[email protected]>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
Co-authored-by: qiaozhanwei <[email protected]>
* add kill command
Co-authored-by: qiaozhanwei <[email protected]>
---
.../remote/command/CommandType.java | 2 +-
.../remote/command/KillTaskRequestCommand.java | 1 +
.../server/worker/WorkerServer.java | 6 +-
...allbackChannel.java => NettyRemoteChannel.java} | 39 ++++++--
.../worker/processor/TaskCallbackService.java | 84 ++++++++++-------
...estProcessor.java => TaskExecuteProcessor.java} | 71 ++++++++++++--
.../server/worker/processor/TaskKillProcessor.java | 103 +++++++++++++++++++++
.../server/worker/runner/FetchTaskThread.java | 2 +-
...kScheduleThread.java => TaskExecuteThread.java} | 71 ++------------
9 files changed, 264 insertions(+), 115 deletions(-)
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 79ef2d9..053b38f 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
EXECUTE_TASK_REQUEST,
/**
* execute task ack
*/
EXECUTE_TASK_ACK,
/**
* execute task response
*/
EXECUTE_TASK_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
EXECUTE_TASK_REQUEST,
/**
* execute task ack
*/
EXECUTE_TASK_ACK,
/**
* execute task response
*/
EXECUTE_TASK_RESPONSE,
/**
* kill task
*/
KILL_TASK_REQUEST,
/**
* kill task response
*/
KILL_TASK_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
new file mode 100644
index 0000000..3ece650
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* kill task request command
*/
public
class KillTaskRequestCommand implements Serializable {
private int taskInstanceId;
private int processId;
private String host;
private String tenantCode;
private String logPath;
private String executePath;
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCod
e = tenantCode;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.KILL_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 2625d68..c2af7b1 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -35,7 +35,8 @@ import
org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import
org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor;
+import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -168,7 +169,8 @@ public class WorkerServer implements IStoppable {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST,
new WorkerRequestProcessor(processService));
+
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST,
new TaskExecuteProcessor(processService));
+
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new
TaskKillProcessor());
this.nettyRemotingServer.start();
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter,
serverConfig.getListenPort());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
similarity index 60%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
index e3d893f..cbb8972 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
@@ -18,24 +18,35 @@
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
/**
* callback channel
*/
-public class CallbackChannel {
+public class NettyRemoteChannel {
/**
* channel
*/
- private Channel channel;
+ private final Channel channel;
/**
* equest unique identification
*/
- private long opaque;
+ private final long opaque;
- public CallbackChannel(Channel channel, long opaque) {
+ /**
+ * master host
+ */
+ private final Host host;
+
+
+ public NettyRemoteChannel(Channel channel, long opaque) {
this.channel = channel;
+ this.host = ChannelUtils.toAddress(channel);
this.opaque = opaque;
}
@@ -43,15 +54,23 @@ public class CallbackChannel {
return channel;
}
- public void setChannel(Channel channel) {
- this.channel = channel;
- }
-
public long getOpaque() {
return opaque;
}
- public void setOpaque(long opaque) {
- this.opaque = opaque;
+ public Host getHost() {
+ return host;
+ }
+
+ public boolean isActive(){
+ return this.channel.isActive();
+ }
+
+ public ChannelFuture writeAndFlush(Command command){
+ return this.channel.writeAndFlush(command);
+ }
+
+ public void close(){
+ this.channel.close();
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 632d2f7..23ac7e2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -21,8 +21,12 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,18 +35,31 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class TaskCallbackService {
+ private final Logger logger =
LoggerFactory.getLogger(TaskCallbackService.class);
+
+ /**
+ * remote channels
+ */
+ private static final ConcurrentHashMap<Integer, NettyRemoteChannel>
REMOTE_CHANNELS = new ConcurrentHashMap<>();
+
/**
- * callback channels
+ * netty remoting client
*/
- private static final ConcurrentHashMap<Integer, CallbackChannel>
CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
+ private final NettyRemotingClient nettyRemotingClient;
+
+
+ public TaskCallbackService(){
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ }
/**
* add callback channel
* @param taskInstanceId taskInstanceId
* @param channel channel
*/
- public void addCallbackChannel(int taskInstanceId, CallbackChannel
channel){
- CALL_BACK_CHANNELS.put(taskInstanceId, channel);
+ public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel
channel){
+ REMOTE_CHANNELS.put(taskInstanceId, channel);
}
/**
@@ -50,15 +67,18 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
- public CallbackChannel getCallbackChannel(int taskInstanceId){
- CallbackChannel callbackChannel =
CALL_BACK_CHANNELS.get(taskInstanceId);
- if(callbackChannel.getChannel().isActive()){
- return callbackChannel;
+ public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
+ NettyRemoteChannel nettyRemoteChannel =
REMOTE_CHANNELS.get(taskInstanceId);
+ if(nettyRemoteChannel.isActive()){
+ return nettyRemoteChannel;
+ }
+ Channel newChannel =
nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
+ if(newChannel != null){
+ NettyRemoteChannel remoteChannel = new
NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
+ addRemoteChannel(taskInstanceId, remoteChannel);
+ return remoteChannel;
}
- Channel newChannel = createChannel();
- callbackChannel.setChannel(newChannel);
- CALL_BACK_CHANNELS.put(taskInstanceId, callbackChannel);
- return callbackChannel;
+ return null;
}
/**
@@ -66,7 +86,7 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId
*/
public void remove(int taskInstanceId){
- CALL_BACK_CHANNELS.remove(taskInstanceId);
+ REMOTE_CHANNELS.remove(taskInstanceId);
}
/**
@@ -75,8 +95,12 @@ public class TaskCallbackService {
* @param ackCommand ackCommand
*/
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
- CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
-
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command());
+ NettyRemoteChannel nettyRemoteChannel =
getRemoteChannel(taskInstanceId);
+ if(nettyRemoteChannel == null){
+ //TODO
+ } else{
+ nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command());
+ }
}
/**
@@ -86,22 +110,20 @@ public class TaskCallbackService {
* @param responseCommand responseCommand
*/
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand
responseCommand){
- CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
-
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new
ChannelFutureListener(){
-
- @Override
- public void operationComplete(ChannelFuture future) throws
Exception {
- if(future.isSuccess()){
- remove(taskInstanceId);
- return;
- }
- }
- });
- }
+ NettyRemoteChannel nettyRemoteChannel =
getRemoteChannel(taskInstanceId);
+ if(nettyRemoteChannel == null){
+ //TODO
+ } else{
+
nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new
ChannelFutureListener(){
- // TODO
- private Channel createChannel(){
- return null;
+ @Override
+ public void operationComplete(ChannelFuture future) throws
Exception {
+ if(future.isSuccess()){
+ remove(taskInstanceId);
+ return;
+ }
+ }
+ });
+ }
}
-
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
similarity index 55%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 39dc136..818e223 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -17,32 +17,41 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Date;
import java.util.concurrent.ExecutorService;
/**
* worker request processor
*/
-public class WorkerRequestProcessor implements NettyRequestProcessor {
+public class TaskExecuteProcessor implements NettyRequestProcessor {
- private final Logger logger =
LoggerFactory.getLogger(WorkerRequestProcessor.class);
+ private final Logger logger =
LoggerFactory.getLogger(TaskExecuteProcessor.class);
/**
* process service
@@ -64,7 +73,7 @@ public class WorkerRequestProcessor implements
NettyRequestProcessor {
*/
private final TaskCallbackService taskCallbackService;
- public WorkerRequestProcessor(ProcessService processService){
+ public TaskExecuteProcessor(ProcessService processService){
this.processService = processService;
this.taskCallbackService = new TaskCallbackService();
this.workerConfig =
SpringApplicationContext.getBean(WorkerConfig.class);
@@ -92,14 +101,62 @@ public class WorkerRequestProcessor implements
NettyRequestProcessor {
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s",
execLocalPath), ex);
}
-
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(),
- new CallbackChannel(channel, command.getOpaque()));
+
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+ new NettyRemoteChannel(channel, command.getOpaque()));
+ this.doAck(taskExecutionContext);
// submit task
- workerExecService.submit(new TaskScheduleThread(taskExecutionContext,
+ workerExecService.submit(new TaskExecuteThread(taskExecutionContext,
processService, taskCallbackService));
}
+ private void doAck(TaskExecutionContext taskExecutionContext){
+ // tell master that task is in executing
+ ExecuteTaskAckCommand ackCommand =
buildAckCommand(taskExecutionContext);
+ taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),
ackCommand);
+ }
+
+ /**
+ * get task log path
+ * @return log path
+ */
+ private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
+ String baseLog = ((TaskLogDiscriminator) ((SiftingAppender)
((LoggerContext) LoggerFactory.getILoggerFactory())
+ .getLogger("ROOT")
+ .getAppender("TASKLOGFILE"))
+ .getDiscriminator()).getLogBase();
+ if (baseLog.startsWith(Constants.SINGLE_SLASH)){
+ return baseLog + Constants.SINGLE_SLASH +
+ taskExecutionContext.getProcessDefineId() +
Constants.SINGLE_SLASH +
+ taskExecutionContext.getProcessInstanceId() +
Constants.SINGLE_SLASH +
+ taskExecutionContext.getTaskInstanceId() + ".log";
+ }
+ return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
+ baseLog + Constants.SINGLE_SLASH +
+ taskExecutionContext.getProcessDefineId() +
Constants.SINGLE_SLASH +
+ taskExecutionContext.getProcessInstanceId() +
Constants.SINGLE_SLASH +
+ taskExecutionContext.getTaskInstanceId() + ".log";
+ }
+
+ /**
+ * build ack command
+ * @param taskExecutionContext taskExecutionContext
+ * @return ExecuteTaskAckCommand
+ */
+ private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext
taskExecutionContext) {
+ ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
+ ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
+ ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
+ ackCommand.setHost(OSUtils.getHost());
+ ackCommand.setStartTime(new Date());
+ if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) ||
taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
+ ackCommand.setExecutePath(null);
+ }else{
+ ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
+ }
+ return ackCommand;
+ }
/**
* get execute local path
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
new file mode 100644
index 0000000..2985374
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * task kill processor
+ */
+public class TaskKillProcessor implements NettyRequestProcessor {
+
+ private final Logger logger =
LoggerFactory.getLogger(TaskKillProcessor.class);
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
+ logger.info("received command : {}", command);
+ KillTaskRequestCommand killCommand =
FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
+ doKill(killCommand);
+ }
+
+
+ private void doKill(KillTaskRequestCommand killCommand){
+ try {
+ if(killCommand.getProcessId() == 0 ){
+ logger.error("process kill failed, process id :{}, task
id:{}", killCommand.getProcessId(), killCommand.getTaskInstanceId());
+ return;
+ }
+ String cmd = String.format("sudo kill -9 %s",
ProcessUtils.getPidsStr(killCommand.getProcessId()));
+
+ logger.info("process id:{}, cmd:{}", killCommand.getProcessId(),
cmd);
+
+ OSUtils.exeCmd(cmd);
+
+ // find log and kill yarn job
+ killYarnJob(killCommand.getHost(), killCommand.getLogPath(),
killCommand.getExecutePath(), killCommand.getTenantCode());
+
+ } catch (Exception e) {
+ logger.error("kill task failed", e);
+ }
+ }
+
+ public void killYarnJob(String host, String logPath, String executePath,
String tenantCode) {
+ try {
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ LogClientService logClient = null;
+ String log = null;
+ try {
+ logClient = new LogClientService();
+ log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
+ } finally {
+ if(logClient != null){
+ logClient.close();
+ }
+ }
+ if (StringUtils.isNotEmpty(log)) {
+ List<String> appIds = LoggerUtils.getAppIds(log, logger);
+ if (StringUtils.isEmpty(executePath)) {
+ logger.error("task instance work dir is empty");
+ throw new RuntimeException("task instance work dir is
empty");
+ }
+ if (appIds.size() > 0) {
+ ProcessUtils.cancelApplication(appIds, logger, tenantCode,
executePath);
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("kill yarn job failure",e);
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
index 9e0c452..0498848 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
@@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{
logger.info("task : {} ready to submit to task scheduler
thread",taskInstId);
// submit task
-// workerExecService.submit(new
TaskScheduleThread(taskInstance, processService));
+// workerExecService.submit(new
TaskExecuteThread(taskInstance, processService));
// remove node from zk
removeNodeFromTaskQueue(taskQueueStr);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
similarity index 76%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 735e4ba..be89401 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,21 +17,15 @@
package org.apache.dolphinscheduler.server.worker.runner;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
@@ -51,12 +45,12 @@ import java.util.stream.Collectors;
/**
* task scheduler thread
*/
-public class TaskScheduleThread implements Runnable {
+public class TaskExecuteThread implements Runnable {
/**
* logger
*/
- private final Logger logger =
LoggerFactory.getLogger(TaskScheduleThread.class);
+ private final Logger logger =
LoggerFactory.getLogger(TaskExecuteThread.class);
/**
* task instance
@@ -74,32 +68,27 @@ public class TaskScheduleThread implements Runnable {
private AbstractTask task;
/**
- * task instance callback service
+ * task callback service
*/
- private TaskCallbackService taskInstanceCallbackService;
+ private TaskCallbackService taskCallbackService;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param processService processService
- * @param taskInstanceCallbackService taskInstanceCallbackService
+ * @param taskCallbackService taskCallbackService
*/
- public TaskScheduleThread(TaskExecutionContext taskExecutionContext,
ProcessService processService, TaskCallbackService taskInstanceCallbackService){
+ public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
ProcessService processService, TaskCallbackService taskCallbackService){
this.processService = processService;
this.taskExecutionContext = taskExecutionContext;
- this.taskInstanceCallbackService = taskInstanceCallbackService;
+ this.taskCallbackService = taskCallbackService;
}
@Override
public void run() {
ExecuteTaskResponseCommand responseCommand = new
ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
-
try {
- // tell master that task is in executing
- ExecuteTaskAckCommand ackCommand =
buildAckCommand(taskExecutionContext);
-
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),
ackCommand);
-
logger.info("script path : {}",
taskExecutionContext.getExecutePath());
// task node
TaskNode taskNode =
JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
@@ -163,7 +152,7 @@ public class TaskScheduleThread implements Runnable {
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
-
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(),
responseCommand);
+
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(),
responseCommand);
}
}
@@ -184,48 +173,6 @@ public class TaskScheduleThread implements Runnable {
}
/**
- * build ack command
- * @param taskExecutionContext taskExecutionContext
- * @return ExecuteTaskAckCommand
- */
- private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext
taskExecutionContext) {
- ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
- ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
- ackCommand.setLogPath(getTaskLogPath());
- ackCommand.setHost(OSUtils.getHost());
- ackCommand.setStartTime(new Date());
- if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) ||
taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
- ackCommand.setExecutePath(null);
- }else{
- ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
- }
- return ackCommand;
- }
-
- /**
- * get task log path
- * @return log path
- */
- private String getTaskLogPath() {
- String baseLog = ((TaskLogDiscriminator) ((SiftingAppender)
((LoggerContext) LoggerFactory.getILoggerFactory())
- .getLogger("ROOT")
- .getAppender("TASKLOGFILE"))
- .getDiscriminator()).getLogBase();
- if (baseLog.startsWith(Constants.SINGLE_SLASH)){
- return baseLog + Constants.SINGLE_SLASH +
- taskExecutionContext.getProcessDefineId() +
Constants.SINGLE_SLASH +
- taskExecutionContext.getProcessInstanceId() +
Constants.SINGLE_SLASH +
- taskExecutionContext.getTaskInstanceId() + ".log";
- }
- return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
- baseLog + Constants.SINGLE_SLASH +
- taskExecutionContext.getProcessDefineId() +
Constants.SINGLE_SLASH +
- taskExecutionContext.getProcessInstanceId() +
Constants.SINGLE_SLASH +
- taskExecutionContext.getTaskInstanceId() + ".log";
- }
-
- /**
* set task timeout
* @param taskProps
* @param taskNode
@@ -259,8 +206,6 @@ public class TaskScheduleThread implements Runnable {
}
-
-
/**
* kill task
*/