This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 926bb69  [pulsar-broker] add pending read subscription metrics to 
stats-internal (#9788)
926bb69 is described below

commit 926bb69c953fe203f31ec72b03bdd64bb68063cd
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Mar 7 18:21:04 2021 -0800

    [pulsar-broker] add pending read subscription metrics to stats-internal 
(#9788)
---
 .../pulsar/broker/service/persistent/PersistentTopic.java  | 14 ++++++++++++++
 .../pulsar/stats/client/PulsarBrokerStatsClientTest.java   |  4 +++-
 .../common/policies/data/PersistentTopicInternalStats.java |  2 ++
 3 files changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 61b488b..25efc0f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1811,6 +1811,20 @@ public class PersistentTopic extends AbstractTopic
             cs.numberOfEntriesSinceFirstNotAckedMessage = 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
             cs.totalNonContiguousDeletedMessagesRange = 
cursor.getTotalNonContiguousDeletedMessagesRange();
             cs.properties = cursor.getProperties();
+            // subscription metrics
+            PersistentSubscription sub = 
subscriptions.get(Codec.decode(c.getName()));
+            if (sub != null) {
+                if (sub.getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
+                    PersistentDispatcherMultipleConsumers dispatcher = 
(PersistentDispatcherMultipleConsumers) sub
+                            .getDispatcher();
+                    cs.subscriptionHavePendingRead = 
dispatcher.havePendingRead;
+                    cs.subscriptionHavePendingReplayRead = 
dispatcher.havePendingReplayRead;
+                } else if (sub.getDispatcher() instanceof 
PersistentDispatcherSingleActiveConsumer) {
+                    PersistentDispatcherSingleActiveConsumer dispatcher = 
(PersistentDispatcherSingleActiveConsumer) sub
+                            .getDispatcher();
+                    cs.subscriptionHavePendingRead = 
dispatcher.havePendingRead;
+                }
+            }
             stats.cursors.put(cursor.getName(), cs);
         });
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 0bdb2fe..ade1cc8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -44,6 +44,7 @@ import javax.ws.rs.ServerErrorException;
 import java.net.URL;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
@@ -128,7 +129,8 @@ public class PulsarBrokerStatsClientTest extends 
ProducerConsumerBase {
         assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, 
numberOfMsgs);
         assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
                 && (cursor.totalNonContiguousDeletedMessagesRange) < 
numberOfMsgs / 2);
-
+        assertFalse(cursor.subscriptionHavePendingRead);
+        assertFalse(cursor.subscriptionHavePendingReplayRead);
         producer.close();
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index 4bd61c0..c9b5ae5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -76,6 +76,8 @@ public class PersistentTopicInternalStats {
         public String state;
         public long numberOfEntriesSinceFirstNotAckedMessage;
         public int totalNonContiguousDeletedMessagesRange;
+        public boolean subscriptionHavePendingRead;
+        public boolean subscriptionHavePendingReplayRead;
 
         public Map<String, Long> properties;
     }

Reply via email to