This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new acd56fc510 [ISSUE #7958] Fix proxy always return the first broker in
findOneBroker (#7960)
acd56fc510 is described below
commit acd56fc5105cbed827d4094d87471ea9f6801634
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Sat Apr 6 16:31:51 2024 +0800
[ISSUE #7958] Fix proxy always return the first broker in findOneBroker
(#7960)
---
.../service/metadata/ClusterMetadataService.java | 8 ++++-
.../metadata/ClusterMetadataServiceTest.java | 37 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
index 226adeb6ec..70ce1d3480 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.proxy.service.metadata;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
+import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -70,6 +72,8 @@ public class ClusterMetadataService extends
AbstractStartAndShutdown implements
protected final static Acl EMPTY_ACL = new Acl();
+ protected final Random random = new Random();
+
public ClusterMetadataService(TopicRouteService topicRouteService,
MQClientAPIFactory mqClientAPIFactory) {
this.topicRouteService = topicRouteService;
@@ -274,7 +278,9 @@ public class ClusterMetadataService extends
AbstractStartAndShutdown implements
protected Optional<BrokerData> findOneBroker(String topic) throws
Exception {
try {
- return
topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
topic).getTopicRouteData().getBrokerDatas().stream().findAny();
+ List<BrokerData> brokerDatas =
topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
topic).getTopicRouteData().getBrokerDatas();
+ int skipNum = random.nextInt(brokerDatas.size());
+ return brokerDatas.stream().skip(skipNum).findFirst();
} catch (Exception e) {
if (TopicRouteHelper.isTopicNotExistError(e)) {
return Optional.empty();
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
index 98bf1104f8..5894f87199 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
@@ -18,10 +18,16 @@
package org.apache.rocketmq.proxy.service.metadata;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
+import org.apache.rocketmq.proxy.service.route.MessageQueueView;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.junit.Before;
@@ -29,6 +35,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -38,6 +45,8 @@ public class ClusterMetadataServiceTest extends
BaseServiceTest {
private ClusterMetadataService clusterMetadataService;
+ protected static final String BROKER2_ADDR = "127.0.0.2:10911";
+
@Before
public void before() throws Throwable {
super.before();
@@ -51,6 +60,16 @@ public class ClusterMetadataServiceTest extends
BaseServiceTest {
when(this.mqClientAPIExt.getSubscriptionGroupConfig(anyString(),
eq(GROUP), anyLong())).thenReturn(new SubscriptionGroupConfig());
this.clusterMetadataService = new
ClusterMetadataService(this.topicRouteService, this.mqClientAPIFactory);
+
+ BrokerData brokerData2 = new BrokerData();
+ brokerData2.setBrokerName("brokerName2");
+ HashMap<Long, String> addrs = new HashMap<>();
+ addrs.put(MixAll.MASTER_ID, BROKER2_ADDR);
+ brokerData2.setBrokerAddrs(addrs);
+ brokerData2.setCluster(CLUSTER_NAME);
+ topicRouteData.getBrokerDatas().add(brokerData2);
+ when(this.topicRouteService.getAllMessageQueueView(any(),
eq(TOPIC))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData,
null));
+
}
@Test
@@ -70,4 +89,22 @@ public class ClusterMetadataServiceTest extends
BaseServiceTest {
assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(ctx,
GROUP));
assertEquals(1,
this.clusterMetadataService.subscriptionGroupConfigCache.asMap().size());
}
+
+ @Test
+ public void findOneBroker() {
+
+ Set<String> resultBrokerNames = new HashSet<>();
+ // run 1000 times to test the random
+ for (int i = 0; i < 1000; i++) {
+ Optional<BrokerData> brokerData = null;
+ try {
+ brokerData = this.clusterMetadataService.findOneBroker(TOPIC);
+ resultBrokerNames.add(brokerData.get().getBrokerName());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ // we should choose two brokers
+ assertEquals(2, resultBrokerNames.size());
+ }
}