This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 59424a831b3 [improve][broker] Add msgInReplay subscription stat and 
metric to improve Key_Shared observability (#23224)
59424a831b3 is described below

commit 59424a831b3ab973c607ba17e9ec6dcebd6e9ee5
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 29 07:39:09 2024 +0300

    [improve][broker] Add msgInReplay subscription stat and metric to improve 
Key_Shared observability (#23224)
---
 .../broker/service/persistent/MessageRedeliveryController.java   | 9 +++++++++
 .../persistent/PersistentDispatcherMultipleConsumers.java        | 5 ++++-
 .../pulsar/broker/service/persistent/PersistentSubscription.java | 1 +
 .../pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java | 4 ++++
 .../broker/stats/prometheus/AggregatedSubscriptionStats.java     | 2 ++
 .../pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java | 3 +++
 .../org/apache/pulsar/broker/stats/prometheus/TopicStats.java    | 2 ++
 .../apache/pulsar/common/policies/data/SubscriptionStats.java    | 3 +++
 .../pulsar/common/policies/data/stats/SubscriptionStatsImpl.java | 6 ++++++
 9 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 526874a7ae3..9d29b93ca45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -149,4 +149,13 @@ public class MessageRedeliveryController {
     public NavigableSet<Position> getMessagesToReplayNow(int 
maxMessagesToRead) {
         return messagesToRedeliver.items(maxMessagesToRead, 
PositionFactory::create);
     }
+
+    /**
+     * Get the number of messages registered for replay in the redelivery 
controller.
+     *
+     * @return number of messages
+     */
+    public int size() {
+        return messagesToRedeliver.size();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 274bdd9947a..20dbc4925d1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -135,7 +135,6 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
-
     protected enum ReadType {
         Normal, Replay
     }
@@ -1352,5 +1351,9 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         return subscription;
     }
 
+    public long getNumberOfMessagesInReplay() {
+        return redeliveryMessages.size();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index f59ea18ce8e..ea1b7d7602b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1276,6 +1276,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 subStats.unackedMessages = d.getTotalUnackedMessages();
                 subStats.blockedSubscriptionOnUnackedMsgs = 
d.isBlockedDispatcherOnUnackedMsgs();
                 subStats.msgDelayed = d.getNumberOfDelayedMessages();
+                subStats.msgInReplay = d.getNumberOfMessagesInReplay();
             }
         }
         subStats.msgBacklog = 
getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 85ff15c915a..aaaea7b493e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -43,6 +43,7 @@ public class AggregatedNamespaceStats {
     public ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
     public long msgBacklog;
     public long msgDelayed;
+    public long msgInReplay;
 
     public long ongoingTxnCount;
     public long abortedTxnCount;
@@ -141,10 +142,12 @@ public class AggregatedNamespaceStats {
             AggregatedSubscriptionStats subsStats =
                     subscriptionStats.computeIfAbsent(n, k -> new 
AggregatedSubscriptionStats());
             msgDelayed += as.msgDelayed;
+            msgInReplay += as.msgInReplay;
             subsStats.blockedSubscriptionOnUnackedMsgs = 
as.blockedSubscriptionOnUnackedMsgs;
             subsStats.msgBacklog += as.msgBacklog;
             subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed;
             subsStats.msgDelayed += as.msgDelayed;
+            subsStats.msgInReplay += as.msgInReplay;
             subsStats.msgRateRedeliver += as.msgRateRedeliver;
             subsStats.unackedMessages += as.unackedMessages;
             subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount;
@@ -200,6 +203,7 @@ public class AggregatedNamespaceStats {
 
         msgBacklog = 0;
         msgDelayed = 0;
+        msgInReplay = 0;
         ongoingTxnCount = 0;
         abortedTxnCount = 0;
         committedTxnCount = 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index da0324c5565..b713146f58b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -43,6 +43,8 @@ public class AggregatedSubscriptionStats {
 
     public long msgDelayed;
 
+    public long msgInReplay;
+
     long msgOutCounter;
 
     long bytesOutCounter;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index f0d11167e65..25c875778c0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -134,6 +134,7 @@ public class NamespaceStatsAggregator {
         subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
         subsStats.msgBacklog = subscriptionStats.msgBacklog;
         subsStats.msgDelayed = subscriptionStats.msgDelayed;
+        subsStats.msgInReplay = subscriptionStats.msgInReplay;
         subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
         subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
         subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - 
subsStats.msgDelayed;
@@ -424,6 +425,8 @@ public class NamespaceStatsAggregator {
 
         writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, 
cluster, namespace);
 
+        writeMetric(stream, "pulsar_subscription_in_replay", 
stats.msgInReplay, cluster, namespace);
+
         writeMetric(stream, "pulsar_delayed_message_index_size_bytes", 
stats.delayedMessageIndexSizeInBytes, cluster,
                 namespace);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 013b5287310..e54a3710e12 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -310,6 +310,8 @@ class TopicStats {
                     subsStats.msgBacklogNoDelayed, cluster, namespace, topic, 
sub, splitTopicAndPartitionIndexLabel);
             writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
                     subsStats.msgDelayed, cluster, namespace, topic, sub, 
splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_in_replay",
+                    subsStats.msgInReplay, cluster, namespace, topic, sub, 
splitTopicAndPartitionIndexLabel);
             writeSubscriptionMetric(stream, 
"pulsar_subscription_msg_rate_redeliver",
                     subsStats.msgRateRedeliver, cluster, namespace, topic, 
sub, splitTopicAndPartitionIndexLabel);
             writeSubscriptionMetric(stream, 
"pulsar_subscription_unacked_messages",
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index cabef1ca960..e307e41862e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -66,6 +66,9 @@ public interface SubscriptionStats {
     /** Number of delayed messages currently being tracked. */
     long getMsgDelayed();
 
+    /** Number of messages registered for replay. */
+    long getMsgInReplay();
+
     /**
      * Number of unacknowledged messages for the subscription, where an 
unacknowledged message is one that has been
      * sent to a consumer but not yet acknowledged. Calculated by summing all 
{@link ConsumerStats#getUnackedMessages()}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index ab4d07c7ae4..977ed28e868 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -74,6 +74,9 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
     /** Number of delayed messages currently being tracked. */
     public long msgDelayed;
 
+    /** Number of messages registered for replay. */
+    public long msgInReplay;
+
     /**
      * Number of unacknowledged messages for the subscription, where an 
unacknowledged message is one that has been
      * sent to a consumer but not yet acknowledged. Calculated by summing all 
{@link ConsumerStatsImpl#unackedMessages}
@@ -173,6 +176,8 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         msgBacklog = 0;
         backlogSize = 0;
         msgBacklogNoDelayed = 0;
+        msgDelayed = 0;
+        msgInReplay = 0;
         unackedMessages = 0;
         type = null;
         msgRateExpired = 0;
@@ -208,6 +213,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         this.backlogSize += stats.backlogSize;
         this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
         this.msgDelayed += stats.msgDelayed;
+        this.msgInReplay += stats.msgInReplay;
         this.unackedMessages += stats.unackedMessages;
         this.type = stats.type;
         this.msgRateExpired += stats.msgRateExpired;

Reply via email to