This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 0dab5cb Refactor worker (#2241)
0dab5cb is described below
commit 0dab5cb04263bb558531a699f1c03a4d88ca811f
Author: Tboy <[email protected]>
AuthorDate: Thu Mar 19 21:31:09 2020 +0800
Refactor worker (#2241)
* let quartz use the same datasource
* move master/worker config from dao.properties to each config
add master/worker registry test
* move mybatis config from application.properties to SpringConnectionFactory
* move mybatis-plus config from application.properties to
SpringConnectionFactory
* refactor TaskCallbackService
---
.../worker/processor/TaskCallbackService.java | 62 ++++++++++++++--------
1 file changed, 40 insertions(+), 22 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 02d889b..a508177 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -21,13 +21,18 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -44,6 +49,12 @@ public class TaskCallbackService {
private static final ConcurrentHashMap<Integer, NettyRemoteChannel>
REMOTE_CHANNELS = new ConcurrentHashMap<>();
/**
+ * zookeeper register center
+ */
+ @Autowired
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+ /**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
@@ -75,11 +86,26 @@ public class TaskCallbackService {
}
Channel newChannel =
nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if(newChannel != null){
- NettyRemoteChannel remoteChannel = new
NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
- addRemoteChannel(taskInstanceId, remoteChannel);
- return remoteChannel;
+ return getRemoteChannel(newChannel,
nettyRemoteChannel.getOpaque(), taskInstanceId);
+ }
+ logger.warn("original master : {} is not reachable, random select
master", nettyRemoteChannel.getHost());
+ Set<String> masterNodes =
zookeeperRegistryCenter.getMasterNodesDirectly();
+ if(CollectionUtils.isEmpty(masterNodes)){
+ throw new IllegalStateException("no available master node
exception");
+ }
+ for(String masterNode : masterNodes){
+ newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
+ if(newChannel != null){
+ return getRemoteChannel(newChannel,
nettyRemoteChannel.getOpaque(), taskInstanceId);
+ }
}
- return null;
+ throw new IllegalStateException(String.format("all available master
nodes : %s are not reachable", masterNodes));
+ }
+
+ private NettyRemoteChannel getRemoteChannel(Channel newChannel, long
opaque, int taskInstanceId){
+ NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel,
opaque);
+ addRemoteChannel(taskInstanceId, remoteChannel);
+ return remoteChannel;
}
/**
@@ -97,11 +123,7 @@ public class TaskCallbackService {
*/
public void sendAck(int taskInstanceId, Command command){
NettyRemoteChannel nettyRemoteChannel =
getRemoteChannel(taskInstanceId);
- if(nettyRemoteChannel == null){
- //TODO
- } else{
- nettyRemoteChannel.writeAndFlush(command);
- }
+ nettyRemoteChannel.writeAndFlush(command);
}
/**
@@ -112,19 +134,15 @@ public class TaskCallbackService {
*/
public void sendResult(int taskInstanceId, Command command){
NettyRemoteChannel nettyRemoteChannel =
getRemoteChannel(taskInstanceId);
- if(nettyRemoteChannel == null){
- //TODO
- } else{
- nettyRemoteChannel.writeAndFlush(command).addListener(new
ChannelFutureListener(){
-
- @Override
- public void operationComplete(ChannelFuture future) throws
Exception {
- if(future.isSuccess()){
- remove(taskInstanceId);
- return;
- }
+ nettyRemoteChannel.writeAndFlush(command).addListener(new
ChannelFutureListener(){
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws
Exception {
+ if(future.isSuccess()){
+ remove(taskInstanceId);
+ return;
}
- });
- }
+ }
+ });
}
}