This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 0dab5cb  Refactor worker (#2241)
0dab5cb is described below

commit 0dab5cb04263bb558531a699f1c03a4d88ca811f
Author: Tboy <[email protected]>
AuthorDate: Thu Mar 19 21:31:09 2020 +0800

    Refactor worker (#2241)
    
    * let quartz use the same datasource
    
    * move master/worker config from dao.properties to each config
    add master/worker registry test
    
    * move mybatis config from application.properties to SpringConnectionFactory
    
    * move mybatis-plus config from application.properties to 
SpringConnectionFactory
    
    * refactor TaskCallbackService
---
 .../worker/processor/TaskCallbackService.java      | 62 ++++++++++++++--------
 1 file changed, 40 insertions(+), 22 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 02d889b..a508177 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -21,13 +21,18 @@ package org.apache.dolphinscheduler.server.worker.processor;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -44,6 +49,12 @@ public class TaskCallbackService {
     private static final ConcurrentHashMap<Integer, NettyRemoteChannel> 
REMOTE_CHANNELS = new ConcurrentHashMap<>();
 
     /**
+     * zookeeper register center
+     */
+    @Autowired
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+    /**
      * netty remoting client
      */
     private final NettyRemotingClient nettyRemotingClient;
@@ -75,11 +86,26 @@ public class TaskCallbackService {
         }
         Channel newChannel = 
nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
         if(newChannel != null){
-            NettyRemoteChannel remoteChannel = new 
NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
-            addRemoteChannel(taskInstanceId, remoteChannel);
-            return remoteChannel;
+            return getRemoteChannel(newChannel, 
nettyRemoteChannel.getOpaque(), taskInstanceId);
+        }
+        logger.warn("original master : {} is not reachable, random select 
master", nettyRemoteChannel.getHost());
+        Set<String> masterNodes = 
zookeeperRegistryCenter.getMasterNodesDirectly();
+        if(CollectionUtils.isEmpty(masterNodes)){
+            throw new IllegalStateException("no available master node 
exception");
+        }
+        for(String masterNode : masterNodes){
+            newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
+            if(newChannel != null){
+                return getRemoteChannel(newChannel, 
nettyRemoteChannel.getOpaque(), taskInstanceId);
+            }
         }
-        return null;
+        throw new IllegalStateException(String.format("all available master 
nodes : %s are not reachable", masterNodes));
+    }
+
+    private NettyRemoteChannel getRemoteChannel(Channel newChannel, long 
opaque, int taskInstanceId){
+        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, 
opaque);
+        addRemoteChannel(taskInstanceId, remoteChannel);
+        return remoteChannel;
     }
 
     /**
@@ -97,11 +123,7 @@ public class TaskCallbackService {
      */
     public void sendAck(int taskInstanceId, Command command){
         NettyRemoteChannel nettyRemoteChannel = 
getRemoteChannel(taskInstanceId);
-        if(nettyRemoteChannel == null){
-            //TODO
-        } else{
-            nettyRemoteChannel.writeAndFlush(command);
-        }
+        nettyRemoteChannel.writeAndFlush(command);
     }
 
     /**
@@ -112,19 +134,15 @@ public class TaskCallbackService {
      */
     public void sendResult(int taskInstanceId, Command command){
         NettyRemoteChannel nettyRemoteChannel = 
getRemoteChannel(taskInstanceId);
-        if(nettyRemoteChannel == null){
-            //TODO
-        } else{
-            nettyRemoteChannel.writeAndFlush(command).addListener(new 
ChannelFutureListener(){
-
-                @Override
-                public void operationComplete(ChannelFuture future) throws 
Exception {
-                    if(future.isSuccess()){
-                        remove(taskInstanceId);
-                        return;
-                    }
+        nettyRemoteChannel.writeAndFlush(command).addListener(new 
ChannelFutureListener(){
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws 
Exception {
+                if(future.isSuccess()){
+                    remove(taskInstanceId);
+                    return;
                 }
-            });
-        }
+            }
+        });
     }
 }

Reply via email to