This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d5cb67f [ISSUE #2165] Slave read enable not work sometimes When
cluster deployed on DLedger mode (#2167)
d5cb67f is described below
commit d5cb67ff802c5d92ba8b42f0a0ebe94a05eb9965
Author: 张旭 <[email protected]>
AuthorDate: Thu Oct 15 11:04:19 2020 +0800
[ISSUE #2165] Slave read enable not work sometimes When cluster deployed on
DLedger mode (#2167)
* [Client] Fix slaveReadEnable=true not work sometimes When cluster
deployed on DLedger mode
* [Client] Add unit test for findBrokerAddressInSubscribe
Co-authored-by: zhangxu16 <[email protected]>
---
.../client/impl/factory/MQClientInstance.java | 5 +++
.../client/impl/factory/MQClientInstanceTest.java | 39 ++++++++++++++++++++++
2 files changed, 44 insertions(+)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 48cc188..b5aaeb8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1043,6 +1043,11 @@ public class MQClientInstance {
slave = brokerId != MixAll.MASTER_ID;
found = brokerAddr != null;
+ if (!found && slave) {
+ brokerAddr = map.get(brokerId + 1);
+ found = brokerAddr != null;
+ }
+
if (!found && !onlyThisBroker) {
Entry<Long, String> entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue();
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index bb21321..e0506aa 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -19,9 +19,12 @@ package org.apache.rocketmq.client.impl.factory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
@@ -30,8 +33,10 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -42,6 +47,12 @@ public class MQClientInstanceTest {
private MQClientInstance mqClientInstance =
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private String topic = "FooBar";
private String group = "FooBarGroup";
+ private ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new
ConcurrentHashMap<String, HashMap<Long, String>>();
+
+ @Before
+ public void init() throws Exception {
+ FieldSetter.setField(mqClientInstance,
MQClientInstance.class.getDeclaredField("brokerAddrTable"), brokerAddrTable);
+ }
@Test
public void testTopicRouteData2TopicPublishInfo() {
@@ -75,6 +86,34 @@ public class MQClientInstanceTest {
}
@Test
+ public void testFindBrokerAddressInSubscribe() {
+ // dledger normal case
+ String brokerName = "BrokerA";
+ HashMap<Long, String> addrMap = new HashMap<Long, String>();
+ addrMap.put(0L, "127.0.0.1:10911");
+ addrMap.put(1L, "127.0.0.1:10912");
+ addrMap.put(2L, "127.0.0.1:10913");
+ brokerAddrTable.put(brokerName, addrMap);
+ long brokerId = 1;
+ FindBrokerResult brokerResult =
mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false);
+ assertThat(brokerResult).isNotNull();
+ assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912");
+ assertThat(brokerResult.isSlave()).isTrue();
+
+ // dledger case, when node n0 was voted as the leader
+ brokerName = "BrokerB";
+ HashMap<Long, String> addrMapNew = new HashMap<Long, String>();
+ addrMapNew.put(0L, "127.0.0.1:10911");
+ addrMapNew.put(2L, "127.0.0.1:10912");
+ addrMapNew.put(3L, "127.0.0.1:10913");
+ brokerAddrTable.put(brokerName, addrMapNew);
+ brokerResult =
mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false);
+ assertThat(brokerResult).isNotNull();
+ assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912");
+ assertThat(brokerResult.isSlave()).isTrue();
+ }
+
+ @Test
public void testRegisterProducer() {
boolean flag = mqClientInstance.registerProducer(group,
mock(DefaultMQProducerImpl.class));
assertThat(flag).isTrue();