This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f68facc7dd71895ab176aa97e2d2ded2bbc89749
Author: limingnihao <[email protected]>
AuthorDate: Tue Feb 9 09:32:25 2021 +0800

    [Issue9364] Fix the metric data of msgDelayed for partitioned topics is not 
aggregated (#9529)
    
    Fixes #9364
    
    
    ### Motivation
    
    When using the deliverAfter interface, if the specified topic type is 
partitioned topic, then the indicator msgDelayed will always be displayed as 0.
    
    ### Modifications
    
    The value of MsgDelayed needs to be aggregated in the Add method of 
SubscriptionStats
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
    
    
    (cherry picked from commit c7f19f571cb9edb9b8f7a1c0e9136062d411c037)
---
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 59 ++++++++++++++++++++++
 .../common/policies/data/SubscriptionStats.java    |  1 +
 2 files changed, 60 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 5fa94d3..3f03da3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2588,4 +2588,63 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
                 PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId() + 1).toString());
     }
+
+    @Test
+    public void testPartitionedTopicMsgDelayedAggregated() throws Exception {
+        final String topic = 
"persistent://prop-xyz/ns1/testPartitionedTopicMsgDelayedAggregated-" + 
UUID.randomUUID().toString();
+        final String subName = "my-sub";
+        final int numPartitions = 2;
+        conf.setSubscriptionRedeliveryTrackerEnabled(true);
+        conf.setDelayedDeliveryEnabled(true);
+        admin.topics().createPartitionedTopic(topic, numPartitions);
+
+        for (int i = 0; i < 2; i++) {
+            pulsarClient.newConsumer()
+                    .topic(topic)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .subscriptionName(subName)
+                    .subscribe();
+        }
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            String msg = "Hello Pulsar - " + i;
+            producer.send(msg.getBytes());
+            producer.newMessage().deliverAfter(1L, 
TimeUnit.HOURS).value(msg.getBytes()).send();
+        }
+        PartitionedTopicStats partitionedTopicStats = 
admin.topics().getPartitionedStats(topic, false);
+        Assert.assertNotNull(partitionedTopicStats);
+        SubscriptionStats subStats = 
partitionedTopicStats.subscriptions.get(subName);
+        Assert.assertNotNull(subStats);
+        Assert.assertEquals(subStats.msgBacklog, subStats.msgBacklogNoDelayed 
+ subStats.msgDelayed);
+
+        partitionedTopicStats = admin.topics().getPartitionedStats(topic, 
true);
+        Assert.assertNotNull(partitionedTopicStats);
+        subStats = partitionedTopicStats.subscriptions.get(subName);
+        Assert.assertNotNull(subStats);
+        Assert.assertEquals(subStats.msgBacklog, subStats.msgBacklogNoDelayed 
+ subStats.msgDelayed);
+        Assert.assertNotNull(partitionedTopicStats.partitions);
+        Assert.assertEquals(partitionedTopicStats.partitions.size(), 
numPartitions);
+
+        long sumMsgBacklog = 0;
+        long sumMsgBacklogNoDelayed = 0;
+        long sumMsgDelayed = 0;
+        for(TopicStats stats: partitionedTopicStats.partitions.values()){
+            Assert.assertNotNull(stats);
+            SubscriptionStats partitionedSubStats = 
stats.subscriptions.get(subName);
+            Assert.assertNotNull(partitionedSubStats);
+            sumMsgBacklog += partitionedSubStats.msgBacklog;
+            sumMsgBacklogNoDelayed += partitionedSubStats.msgBacklogNoDelayed;
+            sumMsgDelayed += partitionedSubStats.msgDelayed;
+        }
+        Assert.assertEquals(sumMsgBacklog, sumMsgBacklogNoDelayed + 
sumMsgDelayed);
+        Assert.assertEquals(sumMsgBacklog, subStats.msgBacklog);
+        Assert.assertEquals(sumMsgBacklogNoDelayed, 
subStats.msgBacklogNoDelayed);
+        Assert.assertEquals(sumMsgDelayed, subStats.msgDelayed);
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index cd3d6d1..1064b69 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -145,6 +145,7 @@ public class SubscriptionStats {
         this.msgRateRedeliver += stats.msgRateRedeliver;
         this.msgBacklog += stats.msgBacklog;
         this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
+        this.msgDelayed += stats.msgDelayed;
         this.unackedMessages += stats.unackedMessages;
         this.msgRateExpired += stats.msgRateExpired;
         this.totalMsgExpired += stats.totalMsgExpired;

Reply via email to