This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9cd5f6a82 fix: brokerAddr to brokerName mapping is not right when has 
multiple clusters
     new 8504abfaf Merge pull request #4693 from xdkxlk/develop
9cd5f6a82 is described below

commit 9cd5f6a82185b06e20f43b725ad6db86393025f8
Author: kaiyi.lk <[email protected]>
AuthorDate: Tue Jul 26 15:11:39 2022 +0800

    fix: brokerAddr to brokerName mapping is not right when has multiple 
clusters
---
 .../transaction/ClusterTransactionService.java     |  7 +--
 .../transaction/ClusterTransactionServiceTest.java | 60 +++++++++++++++++++++-
 2 files changed, 62 insertions(+), 5 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
index 48cdab33d..1d59e1fc8 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
@@ -168,21 +168,22 @@ public class ClusterTransactionService extends 
AbstractTransactionService {
         if (clusterHeartbeatData.isEmpty()) {
             return;
         }
+        Map<String, String> brokerAddrNameMap = new ConcurrentHashMap<>();
         Set<Map.Entry<String, List<HeartbeatData>>> clusterEntry = 
clusterHeartbeatData.entrySet();
         for (Map.Entry<String, List<HeartbeatData>> entry : clusterEntry) {
-            sendHeartBeatToCluster(entry.getKey(), entry.getValue());
+            sendHeartBeatToCluster(entry.getKey(), entry.getValue(), 
brokerAddrNameMap);
         }
+        this.brokerAddrNameMapRef.set(brokerAddrNameMap);
     }
 
     public Map<String, Set<ClusterData>> getGroupClusterData() {
         return groupClusterData;
     }
 
-    protected void sendHeartBeatToCluster(String clusterName, 
List<HeartbeatData> heartbeatDataList) {
+    protected void sendHeartBeatToCluster(String clusterName, 
List<HeartbeatData> heartbeatDataList, Map<String, String> brokerAddrNameMap) {
         if (heartbeatDataList == null) {
             return;
         }
-        Map<String, String> brokerAddrNameMap = new ConcurrentHashMap<>();
         for (HeartbeatData heartbeatData : heartbeatDataList) {
             sendHeartBeatToCluster(clusterName, heartbeatData, 
brokerAddrNameMap);
         }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
index b9706e2af..3f84e972a 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
@@ -18,13 +18,19 @@
 package org.apache.rocketmq.proxy.service.transaction;
 
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.rocketmq.broker.client.ProducerManager;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.service.BaseServiceTest;
 import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -33,12 +39,14 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 public class ClusterTransactionServiceTest extends BaseServiceTest {
@@ -100,6 +108,51 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
 
     @Test
     public void testScanProducerHeartBeat() throws Exception {
+        Mockito.reset(this.topicRouteService);
+        String BROKER_NAME2 = "broker-2-01";
+        String CLUSTER_NAME2 = "broker-2";
+        String BROKER_ADDR2 = "127.0.0.2:10911";
+
+        BrokerData brokerData = new BrokerData();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName(BROKER_NAME2);
+        brokerData.setCluster(CLUSTER_NAME2);
+        brokerData.setBrokerName(BROKER_NAME2);
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR2);
+        brokerData.setBrokerAddrs(brokerAddrs);
+        topicRouteData.getQueueDatas().add(queueData);
+        topicRouteData.getBrokerDatas().add(brokerData);
+        
when(this.topicRouteService.getAllMessageQueueView(eq(TOPIC))).thenReturn(new 
MessageQueueView(TOPIC, topicRouteData));
+
+        TopicRouteData clusterTopicRouteData = new TopicRouteData();
+        QueueData clusterQueueData = new QueueData();
+        BrokerData clusterBrokerData = new BrokerData();
+
+        clusterQueueData.setBrokerName(BROKER_NAME);
+        
clusterTopicRouteData.setQueueDatas(Lists.newArrayList(clusterQueueData));
+        clusterBrokerData.setCluster(CLUSTER_NAME);
+        clusterBrokerData.setBrokerName(BROKER_NAME);
+        brokerAddrs = new HashMap<>();
+        brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
+        clusterBrokerData.setBrokerAddrs(brokerAddrs);
+        
clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData));
+        
when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME))).thenReturn(new
 MessageQueueView(CLUSTER_NAME, clusterTopicRouteData));
+
+        TopicRouteData clusterTopicRouteData2 = new TopicRouteData();
+        QueueData clusterQueueData2 = new QueueData();
+        BrokerData clusterBrokerData2 = new BrokerData();
+
+        clusterQueueData2.setBrokerName(BROKER_NAME2);
+        
clusterTopicRouteData2.setQueueDatas(Lists.newArrayList(clusterQueueData2));
+        clusterBrokerData2.setCluster(CLUSTER_NAME2);
+        clusterBrokerData2.setBrokerName(BROKER_NAME2);
+        brokerAddrs = new HashMap<>();
+        brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR2);
+        clusterBrokerData2.setBrokerAddrs(brokerAddrs);
+        
clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2));
+        
when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME2))).thenReturn(new
 MessageQueueView(CLUSTER_NAME2, clusterTopicRouteData2));
+
         
ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2);
         this.clusterTransactionService.start();
         Set<String> groupSet = new HashSet<>();
@@ -119,9 +172,10 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
 
         this.clusterTransactionService.scanProducerHeartBeat();
 
-        await().atMost(Duration.ofSeconds(1)).until(() -> 
brokerAddrArgumentCaptor.getAllValues().size() == 2);
+        await().atMost(Duration.ofSeconds(1)).until(() -> 
brokerAddrArgumentCaptor.getAllValues().size() == 4);
 
-        assertEquals(Lists.newArrayList(BROKER_ADDR, BROKER_ADDR), 
brokerAddrArgumentCaptor.getAllValues());
+        assertEquals(Lists.newArrayList(BROKER_ADDR, BROKER_ADDR, 
BROKER_ADDR2, BROKER_ADDR2),
+            
brokerAddrArgumentCaptor.getAllValues().stream().sorted().collect(Collectors.toList()));
         List<HeartbeatData> heartbeatDataList = 
heartbeatDataArgumentCaptor.getAllValues();
         for (ProducerData producerData : 
heartbeatDataList.get(0).getProducerDataSet()) {
             groupSet.remove(producerData.getGroupName());
@@ -132,5 +186,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
         }
 
         assertTrue(groupSet.isEmpty());
+        assertEquals(BROKER_NAME2, 
this.clusterTransactionService.getBrokerNameByAddr(BROKER_ADDR2));
+        assertEquals(BROKER_NAME, 
this.clusterTransactionService.getBrokerNameByAddr(BROKER_ADDR));
     }
 }
\ No newline at end of file

Reply via email to