vongosling closed pull request #115: [ROCKETMQ-204]-when all brokers is 
offline, client still attempts to send heartbeat
URL: https://github.com/apache/rocketmq/pull/115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 983e5157e..409a58f8d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -79,6 +79,10 @@ public void createTopic(String key, String newTopic, int 
queueNum) throws MQClie
     public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag) throws MQClientException {
         try {
             TopicRouteData topicRouteData = 
this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, 
timeoutMillis);
+            if (topicRouteData == null) { // auto create topic key does not 
exist
+                throw new MQClientException("Not found broker for auto create 
topic key " + key, null);
+            }
+
             List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
             if (brokerDataList != null && !brokerDataList.isEmpty()) {
                 Collections.sort(brokerDataList);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4244bddcf..2878cedd9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1189,7 +1189,7 @@ public TopicRouteData 
getDefaultTopicRouteInfoFromNameServer(final String topic,
         switch (response.getCode()) {
             case ResponseCode.TOPIC_NOT_EXIST: {
                 // TODO LOG
-                break;
+                return null;
             }
             case ResponseCode.SUCCESS: {
                 byte[] body = response.getBody();
@@ -1217,7 +1217,7 @@ public TopicRouteData 
getTopicRouteInfoFromNameServer(final String topic, final
             case ResponseCode.TOPIC_NOT_EXIST: {
                 if (!topic.equals(MixAll.DEFAULT_TOPIC))
                     log.warn("get Topic [{}] RouteInfoFromNameServer is not 
exist value", topic);
-                break;
+                return null;
             }
             case ResponseCode.SUCCESS: {
                 byte[] body = response.getBody();
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 7d43b372e..7087a5b31 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -324,11 +324,16 @@ public void persistConsumerOffset() {
 
     @Override
     public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) 
{
-        Map<String, SubscriptionData> subTable = 
this.rebalanceImpl.getSubscriptionInner();
-        if (subTable != null) {
-            if (subTable.containsKey(topic)) {
-                this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, 
info);
+        if (info != null) {
+            Map<String, SubscriptionData> subTable = 
this.rebalanceImpl.getSubscriptionInner();
+            if (subTable != null) {
+                if (subTable.containsKey(topic)) {
+                    this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, 
info);
+                }
             }
+        } else {
+            Set<MessageQueue> prev = 
this.rebalanceImpl.getTopicSubscribeInfoTable().remove(topic);
+            log.info("instanceName={}, group={}, topicSubscribeInfoTable of 
topic {} is removed, {}, prev = {}", defaultMQPullConsumer.getInstanceName(), 
defaultMQPullConsumer.getConsumerGroup(), topic, prev);
         }
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 876796424..db628b79b 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -963,11 +963,16 @@ public void persistConsumerOffset() {
 
     @Override
     public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) 
{
-        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
-        if (subTable != null) {
-            if (subTable.containsKey(topic)) {
-                this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
+        if (info != null) {
+            Map<String, SubscriptionData> subTable = 
this.getSubscriptionInner();
+            if (subTable != null) {
+                if (subTable.containsKey(topic)) {
+                    this.rebalanceImpl.topicSubscribeInfoTable.put(topic, 
info);
+                }
             }
+        } else {
+            Set<MessageQueue> prev = 
this.rebalanceImpl.topicSubscribeInfoTable.remove(topic);
+            log.info("instanceName={}, group={}, topicSubscribeInfoTable of 
topic {} is removed, {}, prev = {}", defaultMQPushConsumer.getInstanceName(), 
defaultMQPushConsumer.getConsumerGroup(), topic, prev);
         }
     }
 
@@ -1075,9 +1080,11 @@ private long computeAccumulationTotal() {
         throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
         List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
         TopicRouteData routeData = 
this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic,
 3000);
-        for (BrokerData brokerData : routeData.getBrokerDatas()) {
-            String addr = brokerData.selectBrokerAddr();
-            
queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr,
 topic, groupName(), 3000));
+        if (routeData != null) {
+            for (BrokerData brokerData : routeData.getBrokerDatas()) {
+                String addr = brokerData.selectBrokerAddr();
+                
queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr,
 topic, groupName(), 3000));
+            }
         }
 
         return queueTimeSpan;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 1b075ee11..056735bbe 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -621,34 +621,22 @@ public boolean updateTopicRouteInfoFromNameServer(final 
String topic, boolean is
                             {
                                 TopicPublishInfo publishInfo = 
topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                 publishInfo.setHaveTopicRouterInfo(true);
-                                Iterator<Entry<String, MQProducerInner>> it = 
this.producerTable.entrySet().iterator();
-                                while (it.hasNext()) {
-                                    Entry<String, MQProducerInner> entry = 
it.next();
-                                    MQProducerInner impl = entry.getValue();
-                                    if (impl != null) {
-                                        impl.updateTopicPublishInfo(topic, 
publishInfo);
-                                    }
-                                }
+                                updatePubInfoTable(topic, publishInfo);
                             }
 
                             // Update sub info
                             {
                                 Set<MessageQueue> subscribeInfo = 
topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
-                                Iterator<Entry<String, MQConsumerInner>> it = 
this.consumerTable.entrySet().iterator();
-                                while (it.hasNext()) {
-                                    Entry<String, MQConsumerInner> entry = 
it.next();
-                                    MQConsumerInner impl = entry.getValue();
-                                    if (impl != null) {
-                                        impl.updateTopicSubscribeInfo(topic, 
subscribeInfo);
-                                    }
-                                }
+                                updateSubInfoTable(topic, subscribeInfo);
                             }
                             log.info("topicRouteTable.put. Topic = {}, 
TopicRouteData[{}]", topic, cloneTopicRouteData);
                             this.topicRouteTable.put(topic, 
cloneTopicRouteData);
                             return true;
                         }
                     } else {
-                        log.warn("updateTopicRouteInfoFromNameServer, 
getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
+                        updatePubInfoTable(topic, null);
+                        updateSubInfoTable(topic, null);
+                        this.topicRouteTable.remove(topic);
                     }
                 } catch (Exception e) {
                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && 
!topic.equals(MixAll.DEFAULT_TOPIC)) {
@@ -667,6 +655,28 @@ public boolean updateTopicRouteInfoFromNameServer(final 
String topic, boolean is
         return false;
     }
 
+    private void updateSubInfoTable(String topic, Set<MessageQueue> 
subscribeInfo) {
+        Iterator<Entry<String, MQConsumerInner>> it = 
this.consumerTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, MQConsumerInner> entry = it.next();
+            MQConsumerInner impl = entry.getValue();
+            if (impl != null) {
+                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
+            }
+        }
+    }
+
+    private void updatePubInfoTable(String topic, TopicPublishInfo 
publishInfo) {
+        Iterator<Entry<String, MQProducerInner>> it = 
this.producerTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, MQProducerInner> entry = it.next();
+            MQProducerInner impl = entry.getValue();
+            if (impl != null) {
+                impl.updateTopicPublishInfo(topic, publishInfo);
+            }
+        }
+    }
+
     private HeartbeatData prepareHeartbeatData() {
         HeartbeatData heartbeatData = new HeartbeatData();
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index d828875d3..5da20e990 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -332,6 +332,9 @@ public void updateTopicPublishInfo(final String topic, 
final TopicPublishInfo in
             if (prev != null) {
                 log.info("updateTopicPublishInfo prev is not null, " + 
prev.toString());
             }
+        } else if (info == null) { //remove
+            TopicPublishInfo prev = this.topicPublishInfoTable.remove(topic);
+            log.info("TopicPublishInfo for topic {} is remove, prev = {} ", 
topic, prev.toString());
         }
     }
 
@@ -569,7 +572,7 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final 
String topic) {
             topicPublishInfo = this.topicPublishInfoTable.get(topic);
         }
 
-        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) 
{
+        if (topicPublishInfo != null && 
(topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok())) {
             return topicPublishInfo;
         } else {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, 
true, this.defaultMQProducer);
diff --git a/client/src/test/java/org/apache/rocketmq/client/TestUtil.java 
b/client/src/test/java/org/apache/rocketmq/client/TestUtil.java
new file mode 100644
index 000000000..3653eb953
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/TestUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+/**
+ * Some util method using when running test
+ */
+public final class TestUtil {
+    private TestUtil() {
+    }
+
+    public static TopicRouteData createTestTopicRouteData() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSynFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+
+}
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index 7e0b4f934..b600ffc06 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -16,57 +16,62 @@
  */
 package org.apache.rocketmq.client.impl.factory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
+import java.lang.reflect.Field;
 import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.TestUtil;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientInstanceTest {
+    @Spy
     private MQClientInstance mqClientInstance =  
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
     private String topic = "FooBar";
     private String group = "FooBarGroup";
 
+
+    @Test
+    public void testUpdateTopicRouteInfo() throws Exception {
+        //mock
+        MQClientAPIImpl mqClientAPI = mock(MQClientAPIImpl.class);
+
+        doReturn(TestUtil.createTestTopicRouteData())
+            .doReturn(null)// second time, it will return null
+            .when(mqClientAPI).getTopicRouteInfoFromNameServer(topic, 3000);
+
+        //use spy api impl
+        Field field = 
MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mqClientAPI);
+
+        //firstly, update data from name server
+        mqClientInstance.updateTopicRouteInfoFromNameServer(topic);
+        
assertThat(mqClientInstance.getTopicRouteTable().get(topic)).isNotNull();
+
+
+        //update again, the data should be null, assert that the 
topicRouteInfo is set to empty
+        mqClientInstance.updateTopicRouteInfoFromNameServer(topic);
+        assertThat(mqClientInstance.getTopicRouteTable().get(topic)).isNull();
+    }
+
     @Test
     public void testTopicRouteData2TopicPublishInfo() {
-        TopicRouteData topicRouteData = new TopicRouteData();
-
-        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
-        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
-        BrokerData brokerData = new BrokerData();
-        brokerData.setBrokerName("BrokerA");
-        brokerData.setCluster("DefaultCluster");
-        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
-        brokerAddrs.put(0L, "127.0.0.1:10911");
-        brokerData.setBrokerAddrs(brokerAddrs);
-        brokerDataList.add(brokerData);
-        topicRouteData.setBrokerDatas(brokerDataList);
-
-        List<QueueData> queueDataList = new ArrayList<QueueData>();
-        QueueData queueData = new QueueData();
-        queueData.setBrokerName("BrokerA");
-        queueData.setPerm(6);
-        queueData.setReadQueueNums(3);
-        queueData.setWriteQueueNums(4);
-        queueData.setTopicSynFlag(0);
-        queueDataList.add(queueData);
-        topicRouteData.setQueueDatas(queueDataList);
+        TopicRouteData topicRouteData = TestUtil.createTestTopicRouteData();
 
         TopicPublishInfo topicPublishInfo = 
MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 2f035e0fc..0bc295817 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -19,10 +19,9 @@
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.TestUtil;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.hook.SendMessageContext;
@@ -35,8 +34,6 @@
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
@@ -191,29 +188,7 @@ public void testSendMessageSync_SuccessWithHook() throws 
Throwable {
     }
 
     public static TopicRouteData createTopicRoute() {
-        TopicRouteData topicRouteData = new TopicRouteData();
-
-        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
-        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
-        BrokerData brokerData = new BrokerData();
-        brokerData.setBrokerName("BrokerA");
-        brokerData.setCluster("DefaultCluster");
-        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
-        brokerAddrs.put(0L, "127.0.0.1:10911");
-        brokerData.setBrokerAddrs(brokerAddrs);
-        brokerDataList.add(brokerData);
-        topicRouteData.setBrokerDatas(brokerDataList);
-
-        List<QueueData> queueDataList = new ArrayList<QueueData>();
-        QueueData queueData = new QueueData();
-        queueData.setBrokerName("BrokerA");
-        queueData.setPerm(6);
-        queueData.setReadQueueNums(3);
-        queueData.setWriteQueueNums(4);
-        queueData.setTopicSynFlag(0);
-        queueDataList.add(queueData);
-        topicRouteData.setQueueDatas(queueDataList);
-        return topicRouteData;
+        return TestUtil.createTestTopicRouteData();
     }
 
     private SendResult createSendResult(SendStatus sendStatus) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to