This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f68facc7dd71895ab176aa97e2d2ded2bbc89749 Author: limingnihao <[email protected]> AuthorDate: Tue Feb 9 09:32:25 2021 +0800 [Issue9364] Fix the metric data of msgDelayed for partitioned topics is not aggregated (#9529) Fixes #9364 ### Motivation When using the deliverAfter interface, if the specified topic type is partitioned topic, then the indicator msgDelayed will always be displayed as 0. ### Modifications The value of MsgDelayed needs to be aggregated in the Add method of SubscriptionStats ### Verifying this change - [ ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: (cherry picked from commit c7f19f571cb9edb9b8f7a1c0e9136062d411c037) --- .../apache/pulsar/broker/admin/AdminApiTest.java | 59 ++++++++++++++++++++++ .../common/policies/data/SubscriptionStats.java | 1 + 2 files changed, 60 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 5fa94d3..3f03da3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2588,4 +2588,63 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(consumerStats.getReadPositionWhenJoining(), PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId() + 1).toString()); } + + @Test + public void testPartitionedTopicMsgDelayedAggregated() throws Exception { + final String topic = "persistent://prop-xyz/ns1/testPartitionedTopicMsgDelayedAggregated-" + UUID.randomUUID().toString(); + final String subName = "my-sub"; + final int numPartitions = 2; + conf.setSubscriptionRedeliveryTrackerEnabled(true); + conf.setDelayedDeliveryEnabled(true); + admin.topics().createPartitionedTopic(topic, numPartitions); + + for (int i = 0; i < 2; i++) { + pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subName) + .subscribe(); + } + + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + + final int messages = 100; + for (int i = 0; i < messages; i++) { + String msg = "Hello Pulsar - " + i; + producer.send(msg.getBytes()); + producer.newMessage().deliverAfter(1L, TimeUnit.HOURS).value(msg.getBytes()).send(); + } + PartitionedTopicStats partitionedTopicStats = admin.topics().getPartitionedStats(topic, false); + Assert.assertNotNull(partitionedTopicStats); + SubscriptionStats subStats = partitionedTopicStats.subscriptions.get(subName); + Assert.assertNotNull(subStats); + Assert.assertEquals(subStats.msgBacklog, subStats.msgBacklogNoDelayed + subStats.msgDelayed); + + partitionedTopicStats = admin.topics().getPartitionedStats(topic, true); + Assert.assertNotNull(partitionedTopicStats); + subStats = partitionedTopicStats.subscriptions.get(subName); + Assert.assertNotNull(subStats); + Assert.assertEquals(subStats.msgBacklog, subStats.msgBacklogNoDelayed + subStats.msgDelayed); + Assert.assertNotNull(partitionedTopicStats.partitions); + Assert.assertEquals(partitionedTopicStats.partitions.size(), numPartitions); + + long sumMsgBacklog = 0; + long sumMsgBacklogNoDelayed = 0; + long sumMsgDelayed = 0; + for(TopicStats stats: partitionedTopicStats.partitions.values()){ + Assert.assertNotNull(stats); + SubscriptionStats partitionedSubStats = stats.subscriptions.get(subName); + Assert.assertNotNull(partitionedSubStats); + sumMsgBacklog += partitionedSubStats.msgBacklog; + sumMsgBacklogNoDelayed += partitionedSubStats.msgBacklogNoDelayed; + sumMsgDelayed += partitionedSubStats.msgDelayed; + } + Assert.assertEquals(sumMsgBacklog, sumMsgBacklogNoDelayed + sumMsgDelayed); + Assert.assertEquals(sumMsgBacklog, subStats.msgBacklog); + Assert.assertEquals(sumMsgBacklogNoDelayed, subStats.msgBacklogNoDelayed); + Assert.assertEquals(sumMsgDelayed, subStats.msgDelayed); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index cd3d6d1..1064b69 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -145,6 +145,7 @@ public class SubscriptionStats { this.msgRateRedeliver += stats.msgRateRedeliver; this.msgBacklog += stats.msgBacklog; this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed; + this.msgDelayed += stats.msgDelayed; this.unackedMessages += stats.unackedMessages; this.msgRateExpired += stats.msgRateExpired; this.totalMsgExpired += stats.totalMsgExpired;
