coderzc commented on code in PR #25031:
URL: https://github.com/apache/pulsar/pull/25031#discussion_r2670594237
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java:
##########
@@ -1130,4 +1130,50 @@ public void testRemoveNonExistBundleData()
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
}
+ @Test
+ public void testRepeatSplitBundle() throws Exception {
+ final String cluster = "use";
+ final String tenant = "my-tenant";
+ final String namespace = "repeat-split-bundle";
+ final String topicName = tenant + "/" + namespace + "/" + "topic";
+ int bundleNumbers = 8;
+
+ admin1.clusters().createCluster(cluster, ClusterData.builder()
+ .serviceUrl(pulsar1.getWebServiceAddress()).build());
+ admin1.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet(cluster)));
+ admin1.namespaces().createNamespace(tenant + "/" + namespace,
bundleNumbers);
+
+ LoadData loadData = (LoadData) getField(primaryLoadManager,
"loadData");
+ LocalBrokerData localData = (LocalBrokerData)
getField(primaryLoadManager, "localData");
+
+ @Cleanup
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
+
+ // create a lot of topic to fully distributed among bundles.
+ for (int i = 0; i < 10; i++) {
+ String topicNameI = topicName + i;
+ admin1.topics().createPartitionedTopic(topicNameI, 20);
+ // trigger bundle assignment
+
+ pulsarClient.newConsumer().topic(topicNameI)
+ .subscriptionName("my-subscriber-name2").subscribe();
+ }
+
+ String topicToFindBundle = topicName + 0;
+ NamespaceBundle realBundle =
pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
+ String bundleKey = realBundle.toString();
+ log.info("Before bundle={}", bundleKey);
+
+ NamespaceBundleStats stats = new NamespaceBundleStats();
+ stats.msgRateIn = 100000.0;
+ localData.getLastStats().put(bundleKey, stats);
+ pulsar1.getBrokerService().updateRates();
+
+ primaryLoadManager.updateAll();
+
+ primaryLoadManager.updateAll();
+ Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey));
Review Comment:
If an exception is thrown, the test will fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]