This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9cd5f6a82 fix: brokerAddr to brokerName mapping is not right when has
multiple clusters
new 8504abfaf Merge pull request #4693 from xdkxlk/develop
9cd5f6a82 is described below
commit 9cd5f6a82185b06e20f43b725ad6db86393025f8
Author: kaiyi.lk <[email protected]>
AuthorDate: Tue Jul 26 15:11:39 2022 +0800
fix: brokerAddr to brokerName mapping is not right when has multiple
clusters
---
.../transaction/ClusterTransactionService.java | 7 +--
.../transaction/ClusterTransactionServiceTest.java | 60 +++++++++++++++++++++-
2 files changed, 62 insertions(+), 5 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
index 48cdab33d..1d59e1fc8 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
@@ -168,21 +168,22 @@ public class ClusterTransactionService extends
AbstractTransactionService {
if (clusterHeartbeatData.isEmpty()) {
return;
}
+ Map<String, String> brokerAddrNameMap = new ConcurrentHashMap<>();
Set<Map.Entry<String, List<HeartbeatData>>> clusterEntry =
clusterHeartbeatData.entrySet();
for (Map.Entry<String, List<HeartbeatData>> entry : clusterEntry) {
- sendHeartBeatToCluster(entry.getKey(), entry.getValue());
+ sendHeartBeatToCluster(entry.getKey(), entry.getValue(),
brokerAddrNameMap);
}
+ this.brokerAddrNameMapRef.set(brokerAddrNameMap);
}
public Map<String, Set<ClusterData>> getGroupClusterData() {
return groupClusterData;
}
- protected void sendHeartBeatToCluster(String clusterName,
List<HeartbeatData> heartbeatDataList) {
+ protected void sendHeartBeatToCluster(String clusterName,
List<HeartbeatData> heartbeatDataList, Map<String, String> brokerAddrNameMap) {
if (heartbeatDataList == null) {
return;
}
- Map<String, String> brokerAddrNameMap = new ConcurrentHashMap<>();
for (HeartbeatData heartbeatData : heartbeatDataList) {
sendHeartBeatToCluster(clusterName, heartbeatData,
brokerAddrNameMap);
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
index b9706e2af..3f84e972a 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
@@ -18,13 +18,19 @@
package org.apache.rocketmq.proxy.service.transaction;
import java.time.Duration;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.rocketmq.broker.client.ProducerManager;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -33,12 +39,14 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
public class ClusterTransactionServiceTest extends BaseServiceTest {
@@ -100,6 +108,51 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
@Test
public void testScanProducerHeartBeat() throws Exception {
+ Mockito.reset(this.topicRouteService);
+ String BROKER_NAME2 = "broker-2-01";
+ String CLUSTER_NAME2 = "broker-2";
+ String BROKER_ADDR2 = "127.0.0.2:10911";
+
+ BrokerData brokerData = new BrokerData();
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName(BROKER_NAME2);
+ brokerData.setCluster(CLUSTER_NAME2);
+ brokerData.setBrokerName(BROKER_NAME2);
+ HashMap<Long, String> brokerAddrs = new HashMap<>();
+ brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR2);
+ brokerData.setBrokerAddrs(brokerAddrs);
+ topicRouteData.getQueueDatas().add(queueData);
+ topicRouteData.getBrokerDatas().add(brokerData);
+
when(this.topicRouteService.getAllMessageQueueView(eq(TOPIC))).thenReturn(new
MessageQueueView(TOPIC, topicRouteData));
+
+ TopicRouteData clusterTopicRouteData = new TopicRouteData();
+ QueueData clusterQueueData = new QueueData();
+ BrokerData clusterBrokerData = new BrokerData();
+
+ clusterQueueData.setBrokerName(BROKER_NAME);
+
clusterTopicRouteData.setQueueDatas(Lists.newArrayList(clusterQueueData));
+ clusterBrokerData.setCluster(CLUSTER_NAME);
+ clusterBrokerData.setBrokerName(BROKER_NAME);
+ brokerAddrs = new HashMap<>();
+ brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
+ clusterBrokerData.setBrokerAddrs(brokerAddrs);
+
clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData));
+
when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME))).thenReturn(new
MessageQueueView(CLUSTER_NAME, clusterTopicRouteData));
+
+ TopicRouteData clusterTopicRouteData2 = new TopicRouteData();
+ QueueData clusterQueueData2 = new QueueData();
+ BrokerData clusterBrokerData2 = new BrokerData();
+
+ clusterQueueData2.setBrokerName(BROKER_NAME2);
+
clusterTopicRouteData2.setQueueDatas(Lists.newArrayList(clusterQueueData2));
+ clusterBrokerData2.setCluster(CLUSTER_NAME2);
+ clusterBrokerData2.setBrokerName(BROKER_NAME2);
+ brokerAddrs = new HashMap<>();
+ brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR2);
+ clusterBrokerData2.setBrokerAddrs(brokerAddrs);
+
clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2));
+
when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME2))).thenReturn(new
MessageQueueView(CLUSTER_NAME2, clusterTopicRouteData2));
+
ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2);
this.clusterTransactionService.start();
Set<String> groupSet = new HashSet<>();
@@ -119,9 +172,10 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
this.clusterTransactionService.scanProducerHeartBeat();
- await().atMost(Duration.ofSeconds(1)).until(() ->
brokerAddrArgumentCaptor.getAllValues().size() == 2);
+ await().atMost(Duration.ofSeconds(1)).until(() ->
brokerAddrArgumentCaptor.getAllValues().size() == 4);
- assertEquals(Lists.newArrayList(BROKER_ADDR, BROKER_ADDR),
brokerAddrArgumentCaptor.getAllValues());
+ assertEquals(Lists.newArrayList(BROKER_ADDR, BROKER_ADDR,
BROKER_ADDR2, BROKER_ADDR2),
+
brokerAddrArgumentCaptor.getAllValues().stream().sorted().collect(Collectors.toList()));
List<HeartbeatData> heartbeatDataList =
heartbeatDataArgumentCaptor.getAllValues();
for (ProducerData producerData :
heartbeatDataList.get(0).getProducerDataSet()) {
groupSet.remove(producerData.getGroupName());
@@ -132,5 +186,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
}
assertTrue(groupSet.isEmpty());
+ assertEquals(BROKER_NAME2,
this.clusterTransactionService.getBrokerNameByAddr(BROKER_ADDR2));
+ assertEquals(BROKER_NAME,
this.clusterTransactionService.getBrokerNameByAddr(BROKER_ADDR));
}
}
\ No newline at end of file