This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 926bb69 [pulsar-broker] add pending read subscription metrics to
stats-internal (#9788)
926bb69 is described below
commit 926bb69c953fe203f31ec72b03bdd64bb68063cd
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Mar 7 18:21:04 2021 -0800
[pulsar-broker] add pending read subscription metrics to stats-internal
(#9788)
---
.../pulsar/broker/service/persistent/PersistentTopic.java | 14 ++++++++++++++
.../pulsar/stats/client/PulsarBrokerStatsClientTest.java | 4 +++-
.../common/policies/data/PersistentTopicInternalStats.java | 2 ++
3 files changed, 19 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 61b488b..25efc0f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1811,6 +1811,20 @@ public class PersistentTopic extends AbstractTopic
cs.numberOfEntriesSinceFirstNotAckedMessage =
cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
cs.totalNonContiguousDeletedMessagesRange =
cursor.getTotalNonContiguousDeletedMessagesRange();
cs.properties = cursor.getProperties();
+ // subscription metrics
+ PersistentSubscription sub =
subscriptions.get(Codec.decode(c.getName()));
+ if (sub != null) {
+ if (sub.getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
+ PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) sub
+ .getDispatcher();
+ cs.subscriptionHavePendingRead =
dispatcher.havePendingRead;
+ cs.subscriptionHavePendingReplayRead =
dispatcher.havePendingReplayRead;
+ } else if (sub.getDispatcher() instanceof
PersistentDispatcherSingleActiveConsumer) {
+ PersistentDispatcherSingleActiveConsumer dispatcher =
(PersistentDispatcherSingleActiveConsumer) sub
+ .getDispatcher();
+ cs.subscriptionHavePendingRead =
dispatcher.havePendingRead;
+ }
+ }
stats.cursors.put(cursor.getName(), cs);
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 0bdb2fe..ade1cc8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -44,6 +44,7 @@ import javax.ws.rs.ServerErrorException;
import java.net.URL;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
@@ -128,7 +129,8 @@ public class PulsarBrokerStatsClientTest extends
ProducerConsumerBase {
assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage,
numberOfMsgs);
assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
&& (cursor.totalNonContiguousDeletedMessagesRange) <
numberOfMsgs / 2);
-
+ assertFalse(cursor.subscriptionHavePendingRead);
+ assertFalse(cursor.subscriptionHavePendingReplayRead);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index 4bd61c0..c9b5ae5 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -76,6 +76,8 @@ public class PersistentTopicInternalStats {
public String state;
public long numberOfEntriesSinceFirstNotAckedMessage;
public int totalNonContiguousDeletedMessagesRange;
+ public boolean subscriptionHavePendingRead;
+ public boolean subscriptionHavePendingReplayRead;
public Map<String, Long> properties;
}