This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new e467910 Refactor worker (#1997)
e467910 is described below
commit e4679109af40f874b0579ac8f719fc90a1a95998
Author: Tboy <[email protected]>
AuthorDate: Sat Feb 22 21:39:48 2020 +0800
Refactor worker (#1997)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
---
.../dolphinscheduler/common/thread/Stopper.java | 2 +-
.../remote/NettyRemotingClient.java | 92 +++++++++----
.../remote/command/ExecuteTaskAckCommand.java | 2 +-
.../remote/command/ExecuteTaskRequestCommand.java | 2 +-
.../remote/command/ExecuteTaskResponseCommand.java | 2 +-
.../remote/entity/TaskExecutionContext.java | 15 ++-
.../remote/handler/NettyClientHandler.java | 103 ++++++++++-----
.../remote/handler/NettyServerHandler.java | 2 +-
.../dolphinscheduler/remote/utils/Address.java | 96 --------------
.../remote/utils/ChannelUtils.java | 4 +-
.../dolphinscheduler/remote/utils}/Host.java | 13 +-
.../remote/NettyRemotingClientTest.java | 6 +-
.../builder/TaskExecutionContextBuilder.java | 2 +-
.../server/master/MasterServer.java | 3 +-
.../server/master/dispatch/ExecutorDispatcher.java | 77 +++++++++++
.../context/ExecutionContext.java} | 38 +++---
.../enums/ExecutorType.java} | 10 +-
.../dispatch/exceptions/ExecuteException.java | 95 ++++++++++++++
.../executor/AbstractExecutorManager.java} | 17 ++-
.../executor/ExecutorManager.java} | 12 +-
.../dispatch/executor/NettyExecutorManager.java | 144 +++++++++++++++++++++
.../master/{ => dispatch}/host/HostManager.java | 7 +-
.../{ => dispatch}/host/RoundRobinHostManager.java | 34 +++--
.../{ => dispatch}/host/assign/RandomSelector.java | 2 +-
.../host/assign/RoundRobinSelector.java | 6 +-
.../{ => dispatch}/host/assign/Selector.java | 2 +-
...esponseProcessor.java => TaskAckProcessor.java} | 33 ++---
.../master/processor/TaskResponseProcessor.java | 2 -
.../master/runner/MasterBaseTaskExecThread.java | 53 ++++----
.../server/master/runner/MasterExecThread.java | 4 -
.../server/registry/ZookeeperNodeManager.java | 2 +-
.../server/registry/ZookeeperRegistryCenter.java | 2 +-
.../worker/processor/TaskCallbackService.java | 5 +-
.../worker/processor/WorkerRequestProcessor.java | 8 +-
.../server/worker/runner/TaskScheduleThread.java | 22 ++--
.../service/log/LogClientService.java | 9 +-
36 files changed, 625 insertions(+), 303 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
index cad6914..67c2c81 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
@@ -34,6 +34,6 @@ public class Stopper {
}
public static final void stop(){
- signal.getAndSet(true);
+ signal.set(true);
}
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 96258d7..4c5b365 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -25,6 +25,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
@@ -33,7 +34,8 @@ import
org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.slf4j.Logger;
@@ -64,7 +66,7 @@ public class NettyRemotingClient {
/**
* channels
*/
- private final ConcurrentHashMap<Address, Channel> channels = new
ConcurrentHashMap(128);
+ private final ConcurrentHashMap<Host, Channel> channels = new
ConcurrentHashMap(128);
/**
* started flag
@@ -158,17 +160,17 @@ public class NettyRemotingClient {
/**
* async send
- * @param address address
+ * @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
*/
- public void sendAsync(final Address address, final Command command,
+ public void sendAsync(final Host host, final Command command,
final long timeoutMillis,
final InvokeCallback invokeCallback) throws
InterruptedException, RemotingException {
- final Channel channel = getChannel(address);
+ final Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException("network error");
}
@@ -214,7 +216,7 @@ public class NettyRemotingClient {
});
} catch (Throwable ex){
responseFuture.release();
- throw new RemotingException(String.format("send command to
address: %s failed", address), ex);
+ throw new RemotingException(String.format("send command to
host: %s failed", host), ex);
}
} else{
String message = String.format("try to acquire async semaphore
timeout: %d, waiting thread num: %d, total permits: %d",
@@ -225,17 +227,17 @@ public class NettyRemotingClient {
/**
* sync send
- * @param address address
+ * @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @return command
* @throws InterruptedException
* @throws RemotingException
*/
- public Command sendSync(final Address address, final Command command,
final long timeoutMillis) throws InterruptedException, RemotingException {
- final Channel channel = getChannel(address);
+ public Command sendSync(final Host host, final Command command, final long
timeoutMillis) throws InterruptedException, RemotingException {
+ final Channel channel = getChannel(host);
if (channel == null) {
- throw new RemotingException(String.format("connect to : %s fail",
address));
+ throw new RemotingException(String.format("connect to : %s fail",
host));
}
final long opaque = command.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis, null, null);
@@ -250,7 +252,7 @@ public class NettyRemotingClient {
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
- logger.error("send command {} to address {} failed", command,
address);
+ logger.error("send command {} to host {} failed", command,
host);
}
});
/**
@@ -259,49 +261,89 @@ public class NettyRemotingClient {
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
- throw new RemotingTimeoutException(address.toString(),
timeoutMillis, responseFuture.getCause());
+ throw new RemotingTimeoutException(host.toString(),
timeoutMillis, responseFuture.getCause());
} else{
- throw new RemotingException(address.toString(),
responseFuture.getCause());
+ throw new RemotingException(host.toString(),
responseFuture.getCause());
}
}
return result;
}
+ public void send(final Host host, final Command command) throws
RemotingException {
+ Channel channel = getChannel(host);
+ if (channel == null) {
+ throw new RemotingException(String.format("connect to : %s fail",
host));
+ }
+ try {
+ ChannelFuture future = channel.writeAndFlush(command).await();
+ if (future.isSuccess()) {
+ logger.debug("send command : {} , to : {} successfully.",
command, host.getAddress());
+ } else {
+ String msg = String.format("send command : %s , to :%s
failed", command, host.getAddress());
+ logger.error(msg, future.cause());
+ throw new RemotingException(msg);
+ }
+ } catch (Exception e) {
+ logger.error("Send command {} to address {} encounter error.",
command, host.getAddress());
+ throw new RemotingException(String.format("Send command : %s , to
:%s encounter error", command, host.getAddress()), e);
+ }
+ }
+
+ /**
+ * register processor
+ * @param commandType command type
+ * @param processor processor
+ */
+ public void registerProcessor(final CommandType commandType, final
NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, null);
+ }
+
+ /**
+ * register processor
+ *
+ * @param commandType command type
+ * @param processor processor
+ * @param executor thread executor
+ */
+ public void registerProcessor(final CommandType commandType, final
NettyRequestProcessor processor, final ExecutorService executor) {
+ this.clientHandler.registerProcessor(commandType, processor, executor);
+ }
+
/**
* get channel
- * @param address
+ * @param host
* @return
*/
- public Channel getChannel(Address address) {
- Channel channel = channels.get(address);
+ public Channel getChannel(Host host) {
+ Channel channel = channels.get(host);
if(channel != null && channel.isActive()){
return channel;
}
- return createChannel(address, true);
+ return createChannel(host, true);
}
/**
* create channel
- * @param address address
+ * @param host host
* @param isSync sync flag
* @return channel
*/
- public Channel createChannel(Address address, boolean isSync) {
+ public Channel createChannel(Host host, boolean isSync) {
ChannelFuture future;
try {
synchronized (bootstrap){
- future = bootstrap.connect(new
InetSocketAddress(address.getHost(), address.getPort()));
+ future = bootstrap.connect(new InetSocketAddress(host.getIp(),
host.getPort()));
}
if(isSync){
future.sync();
}
if (future.isSuccess()) {
Channel channel = future.channel();
- channels.put(address, channel);
+ channels.put(host, channel);
return channel;
}
} catch (Exception ex) {
- logger.info("connect to {} error {}", address, ex);
+ logger.info("connect to {} error {}", host, ex);
}
return null;
}
@@ -341,10 +383,10 @@ public class NettyRemotingClient {
/**
* close channel
- * @param address address
+ * @param host host
*/
- public void closeChannel(Address address){
- Channel channel = this.channels.remove(address);
+ public void closeChannel(Host host){
+ Channel channel = this.channels.remove(host);
if(channel != null){
channel.close();
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
index 7f7da0e..8c50a25 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
request command
*/
public class ExecuteTaskAckCommand implements Serializable {
private int taskInstanceId;
private Date startTime;
private String host;
private int status;
private String logPath;
private String executePath;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = lo
gPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.EXECUTE_TASK_ACK);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskAckCommand{" +
"taskInstanceId=" + taskInstanceId +
", startTime=" + startTime +
", host='" + host + '\'' +
", status=" + status +
", logPath='" + logPath + '\'' +
", executePath='" + executePath + '\'' +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
request command
*/
public class ExecuteTaskAckCommand implements Serializable {
private int taskInstanceId;
private Date startTime;
private String host;
private int status;
private String logPath;
private String executePath;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = lo
gPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_ACK);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskAckCommand{" +
"taskInstanceId=" + taskInstanceId +
", startTime=" + startTime +
", host='" + host + '\'' +
", status=" + status +
", logPath='" + logPath + '\'' +
", executePath='" + executePath + '\'' +
'}';
}
}
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
index e1556f3..e7564ed 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
pub
lic class ExecuteTaskRequestCommand implements Serializable {
/**
* task instance json
*/
private String taskInfoJson;
public String getTaskInfoJson() {
return taskInfoJson;
}
public void setTaskInfoJson(String taskInfoJson) {
this.taskInfoJson = taskInfoJson;
}
public ExecuteTaskRequestCommand() {
}
public ExecuteTaskRequestCommand(String taskInfoJson) {
this.taskInfoJson = taskInfoJson;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskRequestCommand{" +
"taskInfoJson='" + taskInfoJson + '\'' +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
pub
lic class ExecuteTaskRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
public ExecuteTaskRequestCommand() {
}
public ExecuteTaskRequestCommand(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskRequestCommand{" +
"taskExecutionContext='" + taskExecutionContext + '\'' +
'}';
}
}
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
index 8193c9d..6bbc2f7 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* package response command
*
* @param opaque request unique identif
ication
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* package response command
* @return command
*/
public Command
convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
index 783d166..e3da43a 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
@@ -26,9 +26,9 @@ import java.util.Date;
public class TaskExecutionContext implements Serializable{
/**
- * task instance id
+ * task id
*/
- private Integer taskId;
+ private Integer taskInstanceId;
/**
@@ -107,12 +107,13 @@ public class TaskExecutionContext implements Serializable{
*/
private Integer projectId;
- public Integer getTaskId() {
- return taskId;
+
+ public Integer getTaskInstanceId() {
+ return taskInstanceId;
}
- public void setTaskId(Integer taskId) {
- this.taskId = taskId;
+ public void setTaskInstanceId(Integer taskInstanceId) {
+ this.taskInstanceId = taskInstanceId;
}
public String getTaskName() {
@@ -230,7 +231,7 @@ public class TaskExecutionContext implements Serializable{
@Override
public String toString() {
return "TaskExecutionContext{" +
- "taskId=" + taskId +
+ "taskInstanceId=" + taskInstanceId +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", taskType='" + taskType + '\'' +
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
index d5d0d4d..48d78d9 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
/**
* netty client request handler
@@ -44,9 +51,20 @@ public class NettyClientHandler extends
ChannelInboundHandlerAdapter {
*/
private final ExecutorService callbackExecutor;
+ /**
+ * processors
+ */
+ private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor,
ExecutorService>> processors;
+
+ /**
+ * default executor
+ */
+ private final ExecutorService defaultExecutor =
Executors.newFixedThreadPool(Constants.CPUS);
+
public NettyClientHandler(NettyRemotingClient nettyRemotingClient,
ExecutorService callbackExecutor){
this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor;
+ this.processors = new ConcurrentHashMap();
}
/**
@@ -71,18 +89,43 @@ public class NettyClientHandler extends
ChannelInboundHandlerAdapter {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
- processReceived((Command)msg);
+ processReceived(ctx.channel(), (Command)msg);
+ }
+
+ /**
+ * register processor
+ *
+ * @param commandType command type
+ * @param processor processor
+ */
+ public void registerProcessor(final CommandType commandType, final
NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, null);
+ }
+
+ /**
+ * register processor
+ *
+ * @param commandType command type
+ * @param processor processor
+ * @param executor thread executor
+ */
+ public void registerProcessor(final CommandType commandType, final
NettyRequestProcessor processor, final ExecutorService executor) {
+ ExecutorService executorRef = executor;
+ if(executorRef == null){
+ executorRef = defaultExecutor;
+ }
+ this.processors.putIfAbsent(commandType, new Pair<>(processor,
executorRef));
}
/**
* process received logic
*
- * @param responseCommand responseCommand
+ * @param command command
*/
- private void processReceived(final Command responseCommand) {
- ResponseFuture future =
ResponseFuture.getFuture(responseCommand.getOpaque());
+ private void processReceived(final Channel channel, final Command command)
{
+ ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
if(future != null){
- future.setResponseCommand(responseCommand);
+ future.setResponseCommand(command);
future.release();
if(future.getInvokeCallback() != null){
this.callbackExecutor.submit(new Runnable() {
@@ -92,10 +135,30 @@ public class NettyClientHandler extends
ChannelInboundHandlerAdapter {
}
});
} else{
- future.putResponse(responseCommand);
+ future.putResponse(command);
}
} else{
- logger.warn("receive response {}, but not matched any request ",
responseCommand);
+ processByCommandType(channel, command);
+ }
+ }
+
+ public void processByCommandType(final Channel channel, final Command
command) {
+ final Pair<NettyRequestProcessor, ExecutorService> pair =
processors.get(command.getType());
+ if (pair != null) {
+ Runnable run = () -> {
+ try {
+ pair.getLeft().process(channel, command);
+ } catch (Throwable e) {
+ logger.error(String.format("process command %s exception",
command), e);
+ }
+ };
+ try {
+ pair.getRight().submit(run);
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is full, discard command {} from {}",
command, ChannelUtils.getRemoteAddress(channel));
+ }
+ } else {
+ logger.warn("receive response {}, but not matched any request ",
command);
}
}
@@ -112,30 +175,4 @@ public class NettyClientHandler extends
ChannelInboundHandlerAdapter {
ctx.channel().close();
}
- /**
- * channel write changed
- *
- * @param ctx channel handler context
- * @throws Exception
- */
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
- Channel ch = ctx.channel();
- ChannelConfig config = ch.config();
-
- if (!ch.isWritable()) {
- if (logger.isWarnEnabled()) {
- logger.warn("{} is not writable, over high water level : {}",
- new Object[]{ch,
config.getWriteBufferHighWaterMark()});
- }
-
- config.setAutoRead(false);
- } else {
- if (logger.isWarnEnabled()) {
- logger.warn("{} is writable, to low water : {}",
- new Object[]{ch, config.getWriteBufferLowWaterMark()});
- }
- config.setAutoRead(true);
- }
- }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
index eabd656..2a4f784 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -98,7 +98,7 @@ public class NettyServerHandler extends
ChannelInboundHandlerAdapter {
if(executorRef == null){
executorRef = nettyRemotingServer.getDefaultExecutor();
}
- this.processors.putIfAbsent(commandType, new
Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
+ this.processors.putIfAbsent(commandType, new Pair<>(processor,
executorRef));
}
/**
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
deleted file mode 100644
index f61dcd6..0000000
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.remote.utils;
-
-import java.io.Serializable;
-
-/**
- * server address
- */
-public class Address implements Serializable {
-
- /**
- * host
- */
- private String host;
-
- /**
- * port
- */
- private int port;
-
- public Address(){
- //NOP
- }
-
- public Address(String host, int port){
- this.host = host;
- this.port = port;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + port;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- Address other = (Address) obj;
- if (host == null) {
- if (other.host != null) {
- return false;
- }
- } else if (!host.equals(other.host)) {
- return false;
- }
- return port == other.port;
- }
-
- @Override
- public String toString() {
- return "Address [host=" + host + ", port=" + port + "]";
- }
-}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index d7af5fe..138a8f0 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -49,9 +49,9 @@ public class ChannelUtils {
* @param channel channel
* @return address
*/
- public static Address toAddress(Channel channel){
+ public static Host toAddress(Channel channel){
InetSocketAddress socketAddress =
((InetSocketAddress)channel.remoteAddress());
- return new Address(socketAddress.getAddress().getHostAddress(),
socketAddress.getPort());
+ return new Host(socketAddress.getAddress().getHostAddress(),
socketAddress.getPort());
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
similarity index 90%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java
rename to
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index 57e64c1..f53c611 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -14,14 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dolphinscheduler.remote.utils;
-package org.apache.dolphinscheduler.server.master.host;
-
-
+import java.io.Serializable;
import java.util.Objects;
-
-public class Host {
+/**
+ * server address
+ */
+public class Host implements Serializable {
private String address;
@@ -67,7 +68,7 @@ public class Host {
public static Host of(String address){
String[] parts = address.split(":");
if (parts.length != 2) {
- throw new IllegalArgumentException(String.format("Address : %s
illegal.", address));
+ throw new IllegalArgumentException(String.format("Host : %s
illegal.", address));
}
Host host = new Host(parts[0], Integer.parseInt(parts[1]));
return host;
diff --git
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
index ef46c2c..cfc10b2 100644
---
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
@@ -27,7 +27,7 @@ import
org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test;
@@ -62,7 +62,7 @@ public class NettyRemotingClientTest {
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
Command commandPing = Ping.create();
try {
- Command response = client.sendSync(new Address("127.0.0.1",
serverConfig.getListenPort()), commandPing, 2000);
+ Command response = client.sendSync(new Host("127.0.0.1",
serverConfig.getListenPort()), commandPing, 2000);
Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
} catch (Exception e) {
e.printStackTrace();
@@ -93,7 +93,7 @@ public class NettyRemotingClientTest {
Command commandPing = Ping.create();
try {
final AtomicLong opaque = new AtomicLong(0);
- client.sendAsync(new Address("127.0.0.1",
serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
+ client.sendAsync(new Host("127.0.0.1",
serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
opaque.set(responseFuture.getOpaque());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index cafd894..a3ddd29 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -40,7 +40,7 @@ public class TaskExecutionContextBuilder {
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder
buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
- taskExecutionContext.setTaskId(taskInstance.getId());
+ taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index d0c7bc2..9493b72 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@@ -106,7 +107,6 @@ public class MasterServer implements IStoppable {
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
new
SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
-
}
/**
@@ -121,6 +121,7 @@ public class MasterServer implements IStoppable {
serverConfig.setListenPort(45678);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE,
new TaskResponseProcessor(processService));
+
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new
TaskAckProcessor(processService));
this.nettyRemotingServer.start();
//
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
new file mode 100644
index 0000000..2fd303a
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch;
+
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import
org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
+import
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import
org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+public class ExecutorDispatcher implements InitializingBean {
+
+ @Autowired
+ private NettyExecutorManager nettyExecutorManager;
+
+ @Autowired
+ private RoundRobinHostManager hostManager;
+
+ private final ConcurrentHashMap<ExecutorType, ExecutorManager>
executorManagers;
+
+ public ExecutorDispatcher(){
+ this.executorManagers = new ConcurrentHashMap<>();
+ }
+
+ public void dispatch(final ExecutionContext executeContext) throws
ExecuteException {
+ ExecutorManager executorManager =
this.executorManagers.get(executeContext.getExecutorType());
+ if(executorManager == null){
+ throw new ExecuteException("no ExecutorManager for type : " +
executeContext.getExecutorType());
+ }
+ Host host = hostManager.select(executeContext);
+ if (StringUtils.isEmpty(host.getAddress())) {
+ throw new ExecuteException(String.format("fail to execute : %s due
to no worker ", executeContext.getContext()));
+ }
+ executeContext.setHost(host);
+ executorManager.beforeExecute(executeContext);
+ try {
+ executorManager.execute(executeContext);
+ } finally {
+ executorManager.afterExecute(executeContext);
+ }
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ register(ExecutorType.WORKER, nettyExecutorManager);
+ register(ExecutorType.CLIENT, nettyExecutorManager);
+ }
+
+ public void register(ExecutorType type, ExecutorManager executorManager){
+ executorManagers.put(type, executorManager);
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
similarity index 52%
copy from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
copy to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 3a3f123..4bccba0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -14,32 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dolphinscheduler.server.master.dispatch.context;
-package org.apache.dolphinscheduler.server.master.host.assign;
-import java.util.Collection;
-import java.util.Random;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+public class ExecutionContext {
-public class RandomSelector<T> implements Selector<T> {
+ private Host host;
- private final Random random = new Random();
+ private final Object context;
- @Override
- public T select(final Collection<T> source) {
+ private final ExecutorType executorType;
- if (source == null || source.size() == 0) {
- throw new IllegalArgumentException("Empty source.");
- }
+ public ExecutionContext(Object context, ExecutorType executorType) {
+ this.context = context;
+ this.executorType = executorType;
+ }
- if (source.size() == 1) {
- return (T) source.toArray()[0];
- }
+ public ExecutorType getExecutorType() {
+ return executorType;
+ }
- int size = source.size();
- int randomIndex = random.nextInt(size);
+ public Object getContext() {
+ return context;
+ }
- return (T) source.toArray()[randomIndex];
+ public Host getHost() {
+ return host;
}
+ public void setHost(Host host) {
+ this.host = host;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
similarity index 79%
copy from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
copy to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
index 316ce36..70aaeae 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
@@ -14,14 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dolphinscheduler.server.master.dispatch.enums;
-package org.apache.dolphinscheduler.server.master.host;
+public enum ExecutorType {
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-
-public interface HostManager {
-
- Host select(TaskExecutionContext context);
+ WORKER,
+ CLIENT;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
new file mode 100644
index 0000000..d8ca50a
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
+
+
+public class ExecuteException extends Exception{
+
+ public ExecuteException() {
+ super();
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public ExecuteException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public ExecuteException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and a detail
+ * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+ * typically contains the class and detail message of <tt>cause</tt>).
+ * This constructor is useful for exceptions that are little more than
+ * wrappers for other throwables (for example, {@link
+ * java.security.PrivilegedActionException}).
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public ExecuteException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message,
+ * cause, suppression enabled or disabled, and writable stack
+ * trace enabled or disabled.
+ *
+ * @param message the detail message.
+ * @param cause the cause. (A {@code null} value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ * @param enableSuppression whether or not suppression is enabled
+ * or disabled
+ * @param writableStackTrace whether or not the stack trace should
+ * be writable
+ * @since 1.7
+ */
+ protected ExecuteException(String message, Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
similarity index 57%
copy from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
copy to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
index 316ce36..65ed15e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -15,13 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-public interface HostManager {
+public abstract class AbstractExecutorManager implements ExecutorManager{
- Host select(TaskExecutionContext context);
+ @Override
+ public void beforeExecute(ExecutionContext executeContext) throws
ExecuteException {
+ //TODO add time monitor
+ }
+ @Override
+ public void afterExecute(ExecutionContext executeContext) throws
ExecuteException {
+ //TODO add dispatch monitor
+
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
similarity index 61%
copy from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
copy to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 316ce36..98d391e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -15,13 +15,17 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-public interface HostManager {
+public interface ExecutorManager {
- Host select(TaskExecutionContext context);
+ void beforeExecute(ExecutionContext executeContext) throws
ExecuteException;
+ void execute(ExecutionContext executeContext) throws ExecuteException;
+
+ void afterExecute(ExecutionContext executeContext) throws ExecuteException;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
new file mode 100644
index 0000000..dac8d79
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+
+@Service
+public class NettyExecutorManager extends AbstractExecutorManager{
+
+ private final Logger logger =
LoggerFactory.getLogger(NettyExecutorManager.class);
+
+ @Autowired
+ private ZookeeperNodeManager zookeeperNodeManager;
+
+ private final NettyRemotingClient nettyRemotingClient;
+
+ public NettyExecutorManager(){
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ }
+
+ @Override
+ public void execute(ExecutionContext executeContext) throws
ExecuteException {
+ Set<String> allNodes = getAllNodes(executeContext);
+ Set<String> failNodeSet = new HashSet<>();
+ //
+ Command command = buildCommand(executeContext);
+ Host host = executeContext.getHost();
+ boolean success = false;
+ //
+ while (!success) {
+ try {
+ doExecute(host, command);
+ success = true;
+ executeContext.setHost(host);
+ } catch (ExecuteException ex) {
+ logger.error(String.format("execute context : %s error",
executeContext.getContext()), ex);
+ try {
+ failNodeSet.add(host.getAddress());
+ Set<String> tmpAllIps = new HashSet<>(allNodes);
+ Collection<String> remained =
CollectionUtils.subtract(tmpAllIps, failNodeSet);
+ if (remained != null && remained.size() > 0) {
+ host = Host.of(remained.iterator().next());
+ logger.error("retry execute context : {} host : {}",
executeContext.getContext(), host);
+ } else {
+ throw new ExecuteException("fail after try all nodes");
+ }
+ } catch (Throwable t) {
+ throw new ExecuteException("fail after try all nodes");
+ }
+ }
+ }
+ }
+
+ private Command buildCommand(ExecutionContext context) {
+ ExecuteTaskRequestCommand requestCommand = new
ExecuteTaskRequestCommand();
+ ExecutorType executorType = context.getExecutorType();
+ switch (executorType){
+ case WORKER:
+ TaskExecutionContext taskExecutionContext =
(TaskExecutionContext)context.getContext();
+
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
+ break;
+ case CLIENT:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid executor type : "
+ executorType);
+
+ }
+ return requestCommand.convert2Command();
+ }
+
+ private void doExecute(final Host host, final Command command) throws
ExecuteException {
+ int retryCount = 3;
+ boolean success = false;
+ do {
+ try {
+ nettyRemotingClient.send(host, command);
+ success = true;
+ } catch (Exception ex) {
+ logger.error(String.format("send command : %s to %s error",
command, host), ex);
+ retryCount--;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignore) {}
+ }
+ } while (retryCount >= 0 && !success);
+
+ if (!success) {
+ throw new ExecuteException(String.format("send command : %s to %s
error", command, host));
+ }
+ }
+
+ private Set<String> getAllNodes(ExecutionContext context){
+ Set<String> nodes = Collections.EMPTY_SET;
+ ExecutorType executorType = context.getExecutorType();
+ switch (executorType){
+ case WORKER:
+ nodes = zookeeperNodeManager.getWorkerNodes();
+ break;
+ case CLIENT:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid executor type : "
+ executorType);
+
+ }
+ return nodes;
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
similarity index 77%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
index 316ce36..8708273 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.host;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
public interface HostManager {
- Host select(TaskExecutionContext context);
+ Host select(ExecutionContext context);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
similarity index 62%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index 18a4659..1c222b8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -15,11 +15,14 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-import
org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,20 +40,35 @@ public class RoundRobinHostManager implements HostManager {
private final Logger logger =
LoggerFactory.getLogger(RoundRobinHostManager.class);
@Autowired
- private RoundRobinSelector<Host> selector;
-
- @Autowired
private ZookeeperNodeManager zookeeperNodeManager;
+ private final Selector<Host> selector;
+
+ public RoundRobinHostManager(){
+ this.selector = new RoundRobinSelector<>();
+ }
+
@Override
- public Host select(TaskExecutionContext context){
+ public Host select(ExecutionContext context){
Host host = new Host();
- Collection<String> nodes = zookeeperNodeManager.getWorkerNodes();
+ Collection<String> nodes = null;
+ ExecutorType executorType = context.getExecutorType();
+ switch (executorType){
+ case WORKER:
+ nodes = zookeeperNodeManager.getWorkerNodes();
+ break;
+ case CLIENT:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid executorType : " +
executorType);
+
+ }
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
+
return selector.select(candidateHosts);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
similarity index 95%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
index 3a3f123..cf8c0e8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
import java.util.Random;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
similarity index 91%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
index d342296..90319de 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
-
+@Service
public class RoundRobinSelector<T> implements Selector<T> {
private final AtomicInteger index = new AtomicInteger(0);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
similarity index 92%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
index c6772f3..bd7c4ac 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
similarity index 59%
copy from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
copy to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index c3b6a05..1103b23 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -22,45 +22,40 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.server.master.future.TaskFuture;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * task response processor
+ * task ack processor
*/
-public class TaskResponseProcessor implements NettyRequestProcessor {
+public class TaskAckProcessor implements NettyRequestProcessor {
- private final Logger logger =
LoggerFactory.getLogger(TaskResponseProcessor.class);
+ private final Logger logger =
LoggerFactory.getLogger(TaskAckProcessor.class);
/**
* process service
*/
private final ProcessService processService;
- public TaskResponseProcessor(ProcessService processService){
+ public TaskAckProcessor(ProcessService processService){
this.processService = processService;
}
- /**
- * task final result response
- * need master process , state persistence
- *
- * @param channel channel
- * @param command command
- */
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
- logger.info("received command : {}", command);
- ExecuteTaskResponseCommand responseCommand =
FastJsonSerializer.deserialize(command.getBody(),
ExecuteTaskResponseCommand.class);
-
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
- TaskFuture.notify(command);
+ Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
+ ExecuteTaskAckCommand taskAckCommand =
FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
+ logger.info("taskAckCommand : {}",taskAckCommand);
+
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
+ taskAckCommand.getStartTime(),
+ taskAckCommand.getHost(),
+ taskAckCommand.getExecutePath(),
+ taskAckCommand.getLogPath(),
+ taskAckCommand.getTaskInstanceId());
}
-
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index c3b6a05..b62bb77 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.server.master.future.TaskFuture;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +58,6 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
logger.info("received command : {}", command);
ExecuteTaskResponseCommand responseCommand =
FastJsonSerializer.deserialize(command.getBody(),
ExecuteTaskResponseCommand.class);
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
- TaskFuture.notify(command);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 7106cc6..f675493 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -32,16 +32,23 @@ import
org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.future.InvokeCallback;
+import org.apache.dolphinscheduler.remote.future.ResponseFuture;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.Callable;
@@ -92,9 +99,9 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
/**
- * netty remoting client
+ * executor dispatcher
*/
- private static final NettyRemotingClient nettyRemotingClient = new
NettyRemotingClient(new NettyClientConfig());
+ private ExecutorDispatcher dispatcher;
/**
* constructor of MasterBaseTaskExecThread
@@ -102,13 +109,14 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
* @param processInstance process instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance
processInstance){
- this.processService = BeanContext.getBean(ProcessService.class);
- this.alertDao = BeanContext.getBean(AlertDao.class);
+ this.processService =
SpringApplicationContext.getBean(ProcessService.class);
+ this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.processInstance = processInstance;
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
+ this.dispatcher =
SpringApplicationContext.getBean(ExecutorDispatcher.class);
}
/**
@@ -126,30 +134,17 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
this.cancel = true;
}
-
- // TODO send task to worker
- public void sendToWorker(TaskInstance taskInstance){
- final Address address = new Address("127.0.0.1", 12346);
-
- ExecuteTaskRequestCommand taskRequestCommand = new
ExecuteTaskRequestCommand(
-
FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
+ /**
+ * dispatch task to worker
+ * @param taskInstance
+ */
+ public void dispatch(TaskInstance taskInstance){
+ TaskExecutionContext context = getTaskExecutionContext(taskInstance);
+ ExecutionContext executionContext = new ExecutionContext(context,
ExecutorType.WORKER);
try {
- Command responseCommand = nettyRemotingClient.sendSync(address,
- taskRequestCommand.convert2Command(), 2000);
-
- ExecuteTaskAckCommand taskAckCommand =
FastJsonSerializer.deserialize(
- responseCommand.getBody(), ExecuteTaskAckCommand.class);
-
- logger.info("taskAckCommand : {}",taskAckCommand);
-
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
- taskAckCommand.getStartTime(),
- taskAckCommand.getHost(),
- taskAckCommand.getExecutePath(),
- taskAckCommand.getLogPath(),
- taskInstance.getId());
-
- } catch (InterruptedException | RemotingException ex) {
- logger.error(String.format("send command to : %s error", address),
ex);
+ dispatcher.dispatch(executionContext);
+ } catch (ExecuteException e) {
+ logger.error("execute exception", e);
}
}
@@ -239,7 +234,7 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
}
if(submitDB && !submitQueue){
// submit task to queue
- sendToWorker(task);
+ dispatch(task);
submitQueue = true;
}
if(submitDB && submitQueue){
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index b8bf1c9..d0f4927 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -33,10 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index e3eacaf..c7a2d0b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -35,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
@Service
-public abstract class ZookeeperNodeManager implements InitializingBean {
+public class ZookeeperNodeManager implements InitializingBean {
private final Logger logger =
LoggerFactory.getLogger(ZookeeperNodeManager.class);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 96b8424..3364a94 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -47,7 +47,7 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
-
+ init();
}
public void init() {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index cd62e98..632d2f7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -76,7 +76,7 @@ public class TaskCallbackService {
*/
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
-
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
+
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command());
}
/**
@@ -87,8 +87,7 @@ public class TaskCallbackService {
*/
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand
responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
-
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(
- callbackChannel.getOpaque())).addListener(new
ChannelFutureListener(){
+
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new
ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws
Exception {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
index 038b8ef..39dc136 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
@@ -79,9 +79,9 @@ public class WorkerRequestProcessor implements
NettyRequestProcessor {
ExecuteTaskRequestCommand taskRequestCommand =
FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
- String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
+ String contextJson = taskRequestCommand.getTaskExecutionContext();
- TaskExecutionContext taskExecutionContext =
JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class);
+ TaskExecutionContext taskExecutionContext =
JSONObject.parseObject(contextJson, TaskExecutionContext.class);
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
@@ -92,7 +92,7 @@ public class WorkerRequestProcessor implements
NettyRequestProcessor {
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s",
execLocalPath), ex);
}
-
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(),
+
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(),
new CallbackChannel(channel, command.getOpaque()));
// submit task
@@ -110,6 +110,6 @@ public class WorkerRequestProcessor implements
NettyRequestProcessor {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskId());
+ taskExecutionContext.getTaskInstanceId());
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index c54842b..b288aea 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -93,12 +93,12 @@ public class TaskScheduleThread implements Runnable {
@Override
public void run() {
- ExecuteTaskResponseCommand responseCommand = new
ExecuteTaskResponseCommand(taskExecutionContext.getTaskId());
+ ExecuteTaskResponseCommand responseCommand = new
ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand =
buildAckCommand(taskExecutionContext.getTaskType());
-
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(),
ackCommand);
+
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),
ackCommand);
logger.info("script path : {}",
taskExecutionContext.getExecutePath());
// task node
@@ -118,7 +118,7 @@ public class TaskScheduleThread implements Runnable {
taskExecutionContext.getScheduleTime(),
taskExecutionContext.getTaskName(),
taskExecutionContext.getTaskType(),
- taskExecutionContext.getTaskId(),
+ taskExecutionContext.getTaskInstanceId(),
CommonUtils.getSystemEnvPath(),
taskExecutionContext.getTenantCode(),
taskExecutionContext.getQueue(),
@@ -132,13 +132,13 @@ public class TaskScheduleThread implements Runnable {
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskId()));
+ taskExecutionContext.getTaskInstanceId()));
// custom logger
Logger taskLogger =
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskId()));
+ taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext.getTaskType(),
taskProps,
@@ -156,14 +156,14 @@ public class TaskScheduleThread implements Runnable {
//
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
- logger.info("task instance id : {},task final status : {}",
taskExecutionContext.getTaskId(), task.getExitStatus());
+ logger.info("task instance id : {},task final status : {}",
taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
-
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(),
responseCommand);
+
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(),
responseCommand);
}
}
@@ -213,13 +213,13 @@ public class TaskScheduleThread implements Runnable {
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() +
Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() +
Constants.SINGLE_SLASH +
- taskExecutionContext.getTaskId() + ".log";
+ taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() +
Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() +
Constants.SINGLE_SLASH +
- taskExecutionContext.getTaskId() + ".log";
+ taskExecutionContext.getTaskInstanceId() + ".log";
}
/**
@@ -325,9 +325,9 @@ public class TaskScheduleThread implements Runnable {
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws
Exception {
- int userId = taskExecutionContext.getExecutorId();
+ int executorId = taskExecutionContext.getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
- PermissionCheck<String> permissionCheck = new
PermissionCheck<>(AuthorizationType.RESOURCE_FILE,
processService,resNames,userId,logger);
+ PermissionCheck<String> permissionCheck = new
PermissionCheck<>(AuthorizationType.RESOURCE_FILE,
processService,resNames,executorId,logger);
permissionCheck.checkPermission();
}
}
\ No newline at end of file
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index 5daf535..c979eb2 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -16,12 +16,11 @@
*/
package org.apache.dolphinscheduler.service.log;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +72,7 @@ public class LogClientService {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum
{} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new
RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
- final Address address = new Address(host, port);
+ final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command,
logRequestTimeout);
@@ -101,7 +100,7 @@ public class LogClientService {
logger.info("view log path {}", path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = "";
- final Address address = new Address(host, port);
+ final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command,
logRequestTimeout);
@@ -129,7 +128,7 @@ public class LogClientService {
logger.info("log path {}", path);
GetLogBytesRequestCommand request = new
GetLogBytesRequestCommand(path);
byte[] result = null;
- final Address address = new Address(host, port);
+ final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command,
logRequestTimeout);