This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 03ea94bbea8 [improve][broker] Add msgInReplay subscription stat and 
metric to improve Key_Shared observability (#23224)
03ea94bbea8 is described below

commit 03ea94bbea81494e826cf21a6718aaa4759e8428
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 29 07:39:09 2024 +0300

    [improve][broker] Add msgInReplay subscription stat and metric to improve 
Key_Shared observability (#23224)
    
    (cherry picked from commit 59424a831b3ab973c607ba17e9ec6dcebd6e9ee5)
---
 .../broker/service/persistent/MessageRedeliveryController.java   | 9 +++++++++
 .../persistent/PersistentDispatcherMultipleConsumers.java        | 5 ++++-
 .../pulsar/broker/service/persistent/PersistentSubscription.java | 1 +
 .../pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java | 4 ++++
 .../broker/stats/prometheus/AggregatedSubscriptionStats.java     | 2 ++
 .../pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java | 3 +++
 .../org/apache/pulsar/broker/stats/prometheus/TopicStats.java    | 2 ++
 .../apache/pulsar/common/policies/data/SubscriptionStats.java    | 3 +++
 .../pulsar/common/policies/data/stats/SubscriptionStatsImpl.java | 6 ++++++
 9 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 63803177242..5ca052616a5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -148,4 +148,13 @@ public class MessageRedeliveryController {
     public NavigableSet<PositionImpl> getMessagesToReplayNow(int 
maxMessagesToRead) {
         return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
     }
+
+    /**
+     * Get the number of messages registered for replay in the redelivery 
controller.
+     *
+     * @return number of messages
+     */
+    public int size() {
+        return messagesToRedeliver.size();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6e294cc7db1..2983639c76a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -135,7 +135,6 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
-
     protected enum ReadType {
         Normal, Replay
     }
@@ -1352,5 +1351,9 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         return subscription;
     }
 
+    public long getNumberOfMessagesInReplay() {
+        return redeliveryMessages.size();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a4f6fdc5427..fde6a0716eb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1296,6 +1296,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 subStats.unackedMessages = d.getTotalUnackedMessages();
                 subStats.blockedSubscriptionOnUnackedMsgs = 
d.isBlockedDispatcherOnUnackedMsgs();
                 subStats.msgDelayed = d.getNumberOfDelayedMessages();
+                subStats.msgInReplay = d.getNumberOfMessagesInReplay();
             }
         }
         subStats.msgBacklog = 
getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 3975cd89cfa..91648c27627 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -43,6 +43,7 @@ public class AggregatedNamespaceStats {
     public ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
     public long msgBacklog;
     public long msgDelayed;
+    public long msgInReplay;
 
     public long ongoingTxnCount;
     public long abortedTxnCount;
@@ -140,10 +141,12 @@ public class AggregatedNamespaceStats {
             AggregatedSubscriptionStats subsStats =
                     subscriptionStats.computeIfAbsent(n, k -> new 
AggregatedSubscriptionStats());
             msgDelayed += as.msgDelayed;
+            msgInReplay += as.msgInReplay;
             subsStats.blockedSubscriptionOnUnackedMsgs = 
as.blockedSubscriptionOnUnackedMsgs;
             subsStats.msgBacklog += as.msgBacklog;
             subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed;
             subsStats.msgDelayed += as.msgDelayed;
+            subsStats.msgInReplay += as.msgInReplay;
             subsStats.msgRateRedeliver += as.msgRateRedeliver;
             subsStats.unackedMessages += as.unackedMessages;
             subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount;
@@ -199,6 +202,7 @@ public class AggregatedNamespaceStats {
 
         msgBacklog = 0;
         msgDelayed = 0;
+        msgInReplay = 0;
         ongoingTxnCount = 0;
         abortedTxnCount = 0;
         committedTxnCount = 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index da0324c5565..b713146f58b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -43,6 +43,8 @@ public class AggregatedSubscriptionStats {
 
     public long msgDelayed;
 
+    public long msgInReplay;
+
     long msgOutCounter;
 
     long bytesOutCounter;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3bbc9100b36..440666c4ec9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -134,6 +134,7 @@ public class NamespaceStatsAggregator {
         subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
         subsStats.msgBacklog = subscriptionStats.msgBacklog;
         subsStats.msgDelayed = subscriptionStats.msgDelayed;
+        subsStats.msgInReplay = subscriptionStats.msgInReplay;
         subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
         subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
         subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - 
subsStats.msgDelayed;
@@ -420,6 +421,8 @@ public class NamespaceStatsAggregator {
 
         writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, 
cluster, namespace);
 
+        writeMetric(stream, "pulsar_subscription_in_replay", 
stats.msgInReplay, cluster, namespace);
+
         writeMetric(stream, "pulsar_delayed_message_index_size_bytes", 
stats.delayedMessageIndexSizeInBytes, cluster,
                 namespace);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 9eb4077225c..5d9e21113e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -310,6 +310,8 @@ class TopicStats {
                     subsStats.msgBacklogNoDelayed, cluster, namespace, topic, 
sub, splitTopicAndPartitionIndexLabel);
             writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
                     subsStats.msgDelayed, cluster, namespace, topic, sub, 
splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_in_replay",
+                    subsStats.msgInReplay, cluster, namespace, topic, sub, 
splitTopicAndPartitionIndexLabel);
             writeSubscriptionMetric(stream, 
"pulsar_subscription_msg_rate_redeliver",
                     subsStats.msgRateRedeliver, cluster, namespace, topic, 
sub, splitTopicAndPartitionIndexLabel);
             writeSubscriptionMetric(stream, 
"pulsar_subscription_unacked_messages",
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index d4850adaa6f..ce3a080a855 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -66,6 +66,9 @@ public interface SubscriptionStats {
     /** Number of delayed messages currently being tracked. */
     long getMsgDelayed();
 
+    /** Number of messages registered for replay. */
+    long getMsgInReplay();
+
     /**
      * Number of unacknowledged messages for the subscription, where an 
unacknowledged message is one that has been
      * sent to a consumer but not yet acknowledged. Calculated by summing all 
{@link ConsumerStats#getUnackedMessages()}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index a8ea0060629..12734a5586c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -74,6 +74,9 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
     /** Number of delayed messages currently being tracked. */
     public long msgDelayed;
 
+    /** Number of messages registered for replay. */
+    public long msgInReplay;
+
     /**
      * Number of unacknowledged messages for the subscription, where an 
unacknowledged message is one that has been
      * sent to a consumer but not yet acknowledged. Calculated by summing all 
{@link ConsumerStatsImpl#unackedMessages}
@@ -167,6 +170,8 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         msgBacklog = 0;
         backlogSize = 0;
         msgBacklogNoDelayed = 0;
+        msgDelayed = 0;
+        msgInReplay = 0;
         unackedMessages = 0;
         type = null;
         msgRateExpired = 0;
@@ -202,6 +207,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         this.backlogSize += stats.backlogSize;
         this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
         this.msgDelayed += stats.msgDelayed;
+        this.msgInReplay += stats.msgInReplay;
         this.unackedMessages += stats.unackedMessages;
         this.type = stats.type;
         this.msgRateExpired += stats.msgRateExpired;

Reply via email to