eolivelli commented on code in PR #19502:
URL: https://github.com/apache/pulsar/pull/19502#discussion_r1104660692
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java:
##########
@@ -1304,4 +1305,55 @@ public void testPartitionsUpdatesForMultipleTopics()
throws Exception {
});
}
+ @Test
+ public void testTopicsDistribution() throws Exception {
+ final String topic = "topics-distribution";
+ final int topicCount = 100;
+ final int consumers = 10;
+
+ for (int i = 0; i < topicCount; i++) {
+ admin.topics().createNonPartitionedTopic(topic + "-" + i);
+ }
+
+ CustomizedConsumerEventListener eventListener = new
CustomizedConsumerEventListener();
+
+ List<Consumer<?>> consumerList = new ArrayList<>(consumers);
+ for (int i = 0; i < consumers; i++) {
+ consumerList.add(pulsarClient.newConsumer()
+ .topics(IntStream.range(0, topicCount).mapToObj(j -> topic
+ "-" + j).toList())
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionName("my-sub")
+ .consumerName("consumer-" + i)
+ .consumerEventListener(eventListener)
+ .subscribe());
+ }
+
+ log.info("Topics are distributed to consumers as {}",
eventListener.getActiveConsumers());
+ Map<String, Integer> assigned = new HashMap<>();
+ eventListener.getActiveConsumers().forEach((k, v) ->
assigned.compute(v, (t, c) -> c == null ? 1 : ++ c));
+ assertEquals(assigned.size(), 10);
Review Comment:
do you want this to be "consumers" instead of 10 ?
this way the test is clearer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]