massakam commented on a change in pull request #12401:
URL: https://github.com/apache/pulsar/pull/12401#discussion_r744416170



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
##########
@@ -2231,4 +2232,170 @@ public void testFailedUpdatePartitionedTopic() throws 
Exception {
         // validate subscription is created for new partition.
         assertNotNull(admin.topics().getStats(partitionedTopicName + 
"-partition-" + 6).getSubscriptions().get(subName1));
     }
+
+    @Test
+    public void testPartitionedStatsAggregationByProducerName() throws 
Exception {
+        final String topic = 
"persistent://prop-xyz/ns1/test-partitioned-stats-aggregation-by-producer-name";
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+        @Cleanup
+        Producer<byte[]> producer1 = client.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new MessageRouter() {
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata 
metadata) {
+                        return msg.hasKey() ? Integer.parseInt(msg.getKey()) : 
0;
+                    }
+                })
+                .accessMode(ProducerAccessMode.Shared)
+                .create();
+
+        @Cleanup
+        Producer<byte[]> producer2 = client.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new MessageRouter() {
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata 
metadata) {
+                        return msg.hasKey() ? Integer.parseInt(msg.getKey()) : 
5;
+                    }
+                })
+                .accessMode(ProducerAccessMode.Shared)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer1.newMessage()
+                    .key(String.valueOf(i % 5))
+                    .value(("message".getBytes(StandardCharsets.UTF_8)))
+                    .send();
+            producer2.newMessage()
+                    .key(String.valueOf(i % 5 + 5))
+                    .value(("message".getBytes(StandardCharsets.UTF_8)))
+                    .send();
+        }
+
+        PartitionedTopicStats topicStats = 
admin.topics().getPartitionedStats(topic, true);
+        assertEquals(topicStats.getPartitions().size(), 10);
+        assertEquals(topicStats.getPartitions().values().stream().mapToInt(e 
-> e.getPublishers().size()).sum(), 10);
+        assertEquals(topicStats.getPartitions().values().stream().map(e -> 
e.getPublishers().get(0).getProducerName()).distinct().count(), 2);
+        assertEquals(topicStats.getPublishers().size(), 2);
+        topicStats.getPublishers().forEach(p -> 
assertTrue(p.isPartialProducerSupported()));
+    }
+
+    @Test
+    public void testPartitionedStatsAggregationByProducerNamePerPartition() 
throws Exception {
+        final String topic = 
"persistent://prop-xyz/ns1/test-partitioned-stats-aggregation-by-producer-name-per-pt";
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+        @Cleanup
+        Producer<byte[]> producer1 = client.newProducer()
+                .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
+                .create();
+
+        @Cleanup
+        Producer<byte[]> producer2 = client.newProducer()
+                .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 1)
+                .create();
+
+        PartitionedTopicStats topicStats = 
admin.topics().getPartitionedStats(topic, true);
+        assertEquals(topicStats.getPartitions().size(), 2);
+        assertEquals(topicStats.getPartitions().values().stream().mapToInt(e 
-> e.getPublishers().size()).sum(), 2);
+        assertEquals(topicStats.getPartitions().values().stream().map(e -> 
e.getPublishers().get(0).getProducerName()).distinct().count(), 2);
+        assertEquals(topicStats.getPublishers().size(), 2);
+        topicStats.getPublishers().forEach(p -> 
assertTrue(p.isPartialProducerSupported()));
+    }
+
+    @Test
+    public void testPartitionedStatsAggregationByProducerNameNonPersistent() 
throws Exception {

Review comment:
       The only difference between this and 
`testPartitionedStatsAggregationByProducerName()` seems to be whether the topic 
is persistent or non-persistent. Wouldn't we be able to cover both test cases 
with one method using `@DataProvider` etc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to