massakam commented on a change in pull request #12401:
URL: https://github.com/apache/pulsar/pull/12401#discussion_r744416170
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
##########
@@ -2231,4 +2232,170 @@ public void testFailedUpdatePartitionedTopic() throws
Exception {
// validate subscription is created for new partition.
assertNotNull(admin.topics().getStats(partitionedTopicName +
"-partition-" + 6).getSubscriptions().get(subName1));
}
+
+ @Test
+ public void testPartitionedStatsAggregationByProducerName() throws
Exception {
+ final String topic =
"persistent://prop-xyz/ns1/test-partitioned-stats-aggregation-by-producer-name";
+ admin.topics().createPartitionedTopic(topic, 10);
+
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+ @Cleanup
+ Producer<byte[]> producer1 = client.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.CustomPartition)
+ .messageRouter(new MessageRouter() {
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
+ return msg.hasKey() ? Integer.parseInt(msg.getKey()) :
0;
+ }
+ })
+ .accessMode(ProducerAccessMode.Shared)
+ .create();
+
+ @Cleanup
+ Producer<byte[]> producer2 = client.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.CustomPartition)
+ .messageRouter(new MessageRouter() {
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
+ return msg.hasKey() ? Integer.parseInt(msg.getKey()) :
5;
+ }
+ })
+ .accessMode(ProducerAccessMode.Shared)
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ producer1.newMessage()
+ .key(String.valueOf(i % 5))
+ .value(("message".getBytes(StandardCharsets.UTF_8)))
+ .send();
+ producer2.newMessage()
+ .key(String.valueOf(i % 5 + 5))
+ .value(("message".getBytes(StandardCharsets.UTF_8)))
+ .send();
+ }
+
+ PartitionedTopicStats topicStats =
admin.topics().getPartitionedStats(topic, true);
+ assertEquals(topicStats.getPartitions().size(), 10);
+ assertEquals(topicStats.getPartitions().values().stream().mapToInt(e
-> e.getPublishers().size()).sum(), 10);
+ assertEquals(topicStats.getPartitions().values().stream().map(e ->
e.getPublishers().get(0).getProducerName()).distinct().count(), 2);
+ assertEquals(topicStats.getPublishers().size(), 2);
+ topicStats.getPublishers().forEach(p ->
assertTrue(p.isPartialProducerSupported()));
+ }
+
+ @Test
+ public void testPartitionedStatsAggregationByProducerNamePerPartition()
throws Exception {
+ final String topic =
"persistent://prop-xyz/ns1/test-partitioned-stats-aggregation-by-producer-name-per-pt";
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+ @Cleanup
+ Producer<byte[]> producer1 = client.newProducer()
+ .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
+ .create();
+
+ @Cleanup
+ Producer<byte[]> producer2 = client.newProducer()
+ .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 1)
+ .create();
+
+ PartitionedTopicStats topicStats =
admin.topics().getPartitionedStats(topic, true);
+ assertEquals(topicStats.getPartitions().size(), 2);
+ assertEquals(topicStats.getPartitions().values().stream().mapToInt(e
-> e.getPublishers().size()).sum(), 2);
+ assertEquals(topicStats.getPartitions().values().stream().map(e ->
e.getPublishers().get(0).getProducerName()).distinct().count(), 2);
+ assertEquals(topicStats.getPublishers().size(), 2);
+ topicStats.getPublishers().forEach(p ->
assertTrue(p.isPartialProducerSupported()));
+ }
+
+ @Test
+ public void testPartitionedStatsAggregationByProducerNameNonPersistent()
throws Exception {
Review comment:
The only difference between this and
`testPartitionedStatsAggregationByProducerName()` seems to be whether the topic
is persistent or non-persistent. Wouldn't we be able to cover both test cases
with one method using `@DataProvider` etc?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]