vongosling closed pull request #78: [ROCKETMQ-141]Make producer client connect 
to new-joined brokers eagerly
URL: https://github.com/apache/rocketmq/pull/78
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java 
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 69ebfefd8..950d75625 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -32,7 +32,7 @@
     /**
      * Pulling topic information interval from the named server
      */
-    private int pollNameServerInteval = 1000 * 30;
+    private int pollNameServerInterval = 1000 * 30;
     /**
      * Heartbeat interval in microseconds with message broker
      */
@@ -86,7 +86,7 @@ public void resetClientConfig(final ClientConfig cc) {
         this.clientIP = cc.clientIP;
         this.instanceName = cc.instanceName;
         this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;
-        this.pollNameServerInteval = cc.pollNameServerInteval;
+        this.pollNameServerInterval = cc.pollNameServerInterval;
         this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
         this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
         this.unitMode = cc.unitMode;
@@ -100,7 +100,7 @@ public ClientConfig cloneClientConfig() {
         cc.clientIP = clientIP;
         cc.instanceName = instanceName;
         cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
-        cc.pollNameServerInteval = pollNameServerInteval;
+        cc.pollNameServerInterval = pollNameServerInterval;
         cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
         cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
         cc.unitMode = unitMode;
@@ -125,12 +125,12 @@ public void setClientCallbackExecutorThreads(int 
clientCallbackExecutorThreads)
         this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
     }
 
-    public int getPollNameServerInteval() {
-        return pollNameServerInteval;
+    public int getPollNameServerInterval() {
+        return pollNameServerInterval;
     }
 
-    public void setPollNameServerInteval(int pollNameServerInteval) {
-        this.pollNameServerInteval = pollNameServerInteval;
+    public void setPollNameServerInterval(int pollNameServerInterval) {
+        this.pollNameServerInterval = pollNameServerInterval;
     }
 
     public int getHeartbeatBrokerInterval() {
@@ -176,7 +176,7 @@ public void setVipChannelEnabled(final boolean 
vipChannelEnabled) {
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + 
clientIP + ", instanceName=" + instanceName
-            + ", clientCallbackExecutorThreads=" + 
clientCallbackExecutorThreads + ", pollNameServerInteval=" + 
pollNameServerInteval
+            + ", clientCallbackExecutorThreads=" + 
clientCallbackExecutorThreads + ", pollNameServerInterval=" + 
pollNameServerInterval
             + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", 
persistConsumerOffsetInterval="
             + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", 
unitName=" + unitName + ", vipChannelEnabled="
             + vipChannelEnabled + "]";
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index ebcfb6261..80600b6c8 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -18,6 +18,7 @@
 
 import java.io.UnsupportedEncodingException;
 import java.net.DatagramSocket;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -275,7 +276,7 @@ public void run() {
                     log.error("ScheduledTask 
updateTopicRouteInfoFromNameServer exception", e);
                 }
             }
-        }, 10, this.clientConfig.getPollNameServerInteval(), 
TimeUnit.MILLISECONDS);
+        }, 10, this.clientConfig.getPollNameServerInterval(), 
TimeUnit.MILLISECONDS);
 
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
@@ -322,6 +323,25 @@ public String getClientId() {
     public void updateTopicRouteInfoFromNameServer() {
         Set<String> topicList = new HashSet<String>();
 
+        // Producer
+        {
+            Iterator<Entry<String, MQProducerInner>> it = 
this.producerTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<String, MQProducerInner> entry = it.next();
+                MQProducerInner impl = entry.getValue();
+                if (impl != null) {
+                    Set<String> lst = impl.getPublishTopicList();
+                    topicList.addAll(lst);
+                }
+            }
+        }
+
+        // Producers need eager connection
+        for (String topic : topicList) {
+            this.updateTopicRouteInfoFromNameServer(topic, true);
+        }
+        topicList.clear();
+
         // Consumer
         {
             Iterator<Entry<String, MQConsumerInner>> it = 
this.consumerTable.entrySet().iterator();
@@ -339,21 +359,8 @@ public void updateTopicRouteInfoFromNameServer() {
             }
         }
 
-        // Producer
-        {
-            Iterator<Entry<String, MQProducerInner>> it = 
this.producerTable.entrySet().iterator();
-            while (it.hasNext()) {
-                Entry<String, MQProducerInner> entry = it.next();
-                MQProducerInner impl = entry.getValue();
-                if (impl != null) {
-                    Set<String> lst = impl.getPublishTopicList();
-                    topicList.addAll(lst);
-                }
-            }
-        }
-
         for (String topic : topicList) {
-            this.updateTopicRouteInfoFromNameServer(topic);
+            this.updateTopicRouteInfoFromNameServer(topic, false);
         }
     }
 
@@ -445,6 +452,10 @@ public void adjustThreadPool() {
         }
     }
 
+    public boolean updateTopicRouteInfoFromNameServer(final String topic, 
final boolean connectImmediately) {
+        return updateTopicRouteInfoFromNameServer(topic, false, null, 
connectImmediately);
+    }
+
     public boolean updateTopicRouteInfoFromNameServer(final String topic) {
         return updateTopicRouteInfoFromNameServer(topic, false, null);
     }
@@ -536,7 +547,13 @@ private void uploadFilterClassSource() {
         }
     }
 
-    public boolean updateTopicRouteInfoFromNameServer(final String topic, 
boolean isDefault, DefaultMQProducer defaultMQProducer) {
+    public boolean updateTopicRouteInfoFromNameServer(final String topic, 
boolean isDefault,
+        DefaultMQProducer defaultMQProducer) {
+        return updateTopicRouteInfoFromNameServer(topic, isDefault, 
defaultMQProducer, false);
+    }
+
+    public boolean updateTopicRouteInfoFromNameServer(final String topic, 
boolean isDefault,
+        DefaultMQProducer defaultMQProducer, boolean connectImmediately) {
         try {
             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
                 try {
@@ -567,6 +584,11 @@ public boolean updateTopicRouteInfoFromNameServer(final 
String topic, boolean is
                             TopicRouteData cloneTopicRouteData = 
topicRouteData.cloneTopicRouteData();
 
                             for (BrokerData bd : 
topicRouteData.getBrokerDatas()) {
+                                // Establish connection to new-join brokers.
+                                if (connectImmediately) {
+                                    connect(bd);
+                                }
+
                                 this.brokerAddrTable.put(bd.getBrokerName(), 
bd.getBrokerAddrs());
                             }
 
@@ -620,6 +642,37 @@ public boolean updateTopicRouteInfoFromNameServer(final 
String topic, boolean is
         return false;
     }
 
+    private void connect(BrokerData brokerData) {
+
+        if (null == brokerData || null == brokerData.getBrokerAddrs() || 
brokerData.getBrokerAddrs().isEmpty()) {
+            return;
+        }
+
+        Map<Long, String> existingBrokers = 
this.brokerAddrTable.get(brokerData.getBrokerName());
+
+        List<String> brokerAddresses = new ArrayList<String>();
+        if (null == existingBrokers) {
+            brokerAddresses.addAll(brokerData.getBrokerAddrs().values());
+        } else {
+            Set<String> existingBrokerAddresses = new HashSet<String>();
+            existingBrokerAddresses.addAll(existingBrokers.values());
+            for (Map.Entry<Long, String> next : 
brokerData.getBrokerAddrs().entrySet()) {
+                if (!existingBrokerAddresses.contains(next.getValue())) {
+                    brokerAddresses.add(next.getValue());
+                }
+            }
+        }
+
+        for (String brokerAddress : brokerAddresses) {
+            boolean successful = 
this.mQClientAPIImpl.getRemotingClient().connect(brokerAddress);
+            if (successful) {
+                log.info("Eagerly established a connection to: {}", 
brokerAddress);
+            } else {
+                log.warn("Failed to eagerly establish a connection to: {}", 
brokerAddress);
+            }
+        }
+    }
+
     private HeartbeatData prepareHeartbeatData() {
         HeartbeatData heartbeatData = new HeartbeatData();
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 8e819792c..5aa87d956 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -329,7 +329,7 @@ public void updateTopicPublishInfo(final String topic, 
final TopicPublishInfo in
         if (info != null && topic != null) {
             TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, 
info);
             if (prev != null) {
-                log.info("updateTopicPublishInfo prev is not null, " + 
prev.toString());
+                log.info("#updateTopicPublishInfo for topic: {}, previous 
topicPublishInfo is: {}", topic, prev);
             }
         }
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 276a56539..bd96316cd 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -27,24 +27,26 @@
 
 public interface RemotingClient extends RemotingService {
 
-    public void updateNameServerAddressList(final List<String> addrs);
+    void updateNameServerAddressList(final List<String> addrs);
 
-    public List<String> getNameServerAddressList();
+    List<String> getNameServerAddressList();
 
-    public RemotingCommand invokeSync(final String addr, final RemotingCommand 
request,
+    RemotingCommand invokeSync(final String addr, final RemotingCommand 
request,
         final long timeoutMillis) throws InterruptedException, 
RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException;
 
-    public void invokeAsync(final String addr, final RemotingCommand request, 
final long timeoutMillis,
+    void invokeAsync(final String addr, final RemotingCommand request, final 
long timeoutMillis,
         final InvokeCallback invokeCallback) throws InterruptedException, 
RemotingConnectException,
         RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException;
 
-    public void invokeOneway(final String addr, final RemotingCommand request, 
final long timeoutMillis)
+    void invokeOneway(final String addr, final RemotingCommand request, final 
long timeoutMillis)
         throws InterruptedException, RemotingConnectException, 
RemotingTooMuchRequestException,
         RemotingTimeoutException, RemotingSendRequestException;
 
-    public void registerProcessor(final int requestCode, final 
NettyRequestProcessor processor,
+    void registerProcessor(final int requestCode, final NettyRequestProcessor 
processor,
         final ExecutorService executor);
 
-    public boolean isChannelWriteable(final String addr);
+    boolean isChannelWritable(final String addr);
+
+    boolean connect(final String address);
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 85f9244d2..6927022c9 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -527,7 +527,7 @@ public void registerProcessor(int requestCode, 
NettyRequestProcessor processor,
     }
 
     @Override
-    public boolean isChannelWriteable(String addr) {
+    public boolean isChannelWritable(String addr) {
         ChannelWrapper cw = this.channelTables.get(addr);
         if (cw != null && cw.isOK()) {
             return cw.isWriteable();
@@ -535,6 +535,18 @@ public boolean isChannelWriteable(String addr) {
         return true;
     }
 
+    @Override
+    public boolean connect(String address) {
+        try {
+            Channel channel = getAndCreateChannel(address);
+            if (null != channel) {
+                return true;
+            }
+        } catch (InterruptedException ignore) {
+        }
+        return false;
+    }
+
     @Override
     public List<String> getNameServerAddressList() {
         return this.namesrvAddrList.get();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to