This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 766d2a40719 [improve][broker] Add msgInReplay subscription stat and
metric to improve Key_Shared observability (#23224)
766d2a40719 is described below
commit 766d2a407196533832184447c25498c6a82f7a86
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 29 07:39:09 2024 +0300
[improve][broker] Add msgInReplay subscription stat and metric to improve
Key_Shared observability (#23224)
(cherry picked from commit 59424a831b3ab973c607ba17e9ec6dcebd6e9ee5)
---
.../broker/service/persistent/MessageRedeliveryController.java | 9 +++++++++
.../persistent/PersistentDispatcherMultipleConsumers.java | 4 ++++
.../pulsar/broker/service/persistent/PersistentSubscription.java | 1 +
.../pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java | 4 ++++
.../broker/stats/prometheus/AggregatedSubscriptionStats.java | 2 ++
.../pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java | 3 +++
.../org/apache/pulsar/broker/stats/prometheus/TopicStats.java | 2 ++
.../apache/pulsar/common/policies/data/SubscriptionStats.java | 3 +++
.../pulsar/common/policies/data/stats/SubscriptionStatsImpl.java | 6 ++++++
9 files changed, 34 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 63803177242..5ca052616a5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -148,4 +148,13 @@ public class MessageRedeliveryController {
public NavigableSet<PositionImpl> getMessagesToReplayNow(int
maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}
+
+ /**
+ * Get the number of messages registered for replay in the redelivery
controller.
+ *
+ * @return number of messages
+ */
+ public int size() {
+ return messagesToRedeliver.size();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 7d0cebc0870..ae844b57844 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1270,5 +1270,9 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
return subscription;
}
+ public long getNumberOfMessagesInReplay() {
+ return redeliveryMessages.size();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index aa893db0e5a..538d131ea7c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1226,6 +1226,7 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
subStats.unackedMessages = d.getTotalUnackedMessages();
subStats.blockedSubscriptionOnUnackedMsgs =
d.isBlockedDispatcherOnUnackedMsgs();
subStats.msgDelayed = d.getNumberOfDelayedMessages();
+ subStats.msgInReplay = d.getNumberOfMessagesInReplay();
}
}
subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 85ff15c915a..aaaea7b493e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -43,6 +43,7 @@ public class AggregatedNamespaceStats {
public ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
public long msgBacklog;
public long msgDelayed;
+ public long msgInReplay;
public long ongoingTxnCount;
public long abortedTxnCount;
@@ -141,10 +142,12 @@ public class AggregatedNamespaceStats {
AggregatedSubscriptionStats subsStats =
subscriptionStats.computeIfAbsent(n, k -> new
AggregatedSubscriptionStats());
msgDelayed += as.msgDelayed;
+ msgInReplay += as.msgInReplay;
subsStats.blockedSubscriptionOnUnackedMsgs =
as.blockedSubscriptionOnUnackedMsgs;
subsStats.msgBacklog += as.msgBacklog;
subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed;
subsStats.msgDelayed += as.msgDelayed;
+ subsStats.msgInReplay += as.msgInReplay;
subsStats.msgRateRedeliver += as.msgRateRedeliver;
subsStats.unackedMessages += as.unackedMessages;
subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount;
@@ -200,6 +203,7 @@ public class AggregatedNamespaceStats {
msgBacklog = 0;
msgDelayed = 0;
+ msgInReplay = 0;
ongoingTxnCount = 0;
abortedTxnCount = 0;
committedTxnCount = 0;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index da0324c5565..b713146f58b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -43,6 +43,8 @@ public class AggregatedSubscriptionStats {
public long msgDelayed;
+ public long msgInReplay;
+
long msgOutCounter;
long bytesOutCounter;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index d25af8d289c..56ea874ac01 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -134,6 +134,7 @@ public class NamespaceStatsAggregator {
subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
+ subsStats.msgInReplay = subscriptionStats.msgInReplay;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog -
subsStats.msgDelayed;
@@ -412,6 +413,8 @@ public class NamespaceStatsAggregator {
writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed,
cluster, namespace);
+ writeMetric(stream, "pulsar_subscription_in_replay",
stats.msgInReplay, cluster, namespace);
+
writeMetric(stream, "pulsar_delayed_message_index_size_bytes",
stats.delayedMessageIndexSizeInBytes, cluster,
namespace);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index e907760d9d9..7f0dac6fee4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -291,6 +291,8 @@ class TopicStats {
subsStats.msgBacklogNoDelayed, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
subsStats.msgDelayed, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_in_replay",
+ subsStats.msgInReplay, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream,
"pulsar_subscription_msg_rate_redeliver",
subsStats.msgRateRedeliver, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream,
"pulsar_subscription_unacked_messages",
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 9ff94a2952e..dfa216da9f4 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -66,6 +66,9 @@ public interface SubscriptionStats {
/** Number of delayed messages currently being tracked. */
long getMsgDelayed();
+ /** Number of messages registered for replay. */
+ long getMsgInReplay();
+
/**
* Number of unacknowledged messages for the subscription, where an
unacknowledged message is one that has been
* sent to a consumer but not yet acknowledged. Calculated by summing all
{@link ConsumerStats#getUnackedMessages()}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index d77764e679d..a8c3c91dffb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -74,6 +74,9 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
/** Number of delayed messages currently being tracked. */
public long msgDelayed;
+ /** Number of messages registered for replay. */
+ public long msgInReplay;
+
/**
* Number of unacknowledged messages for the subscription, where an
unacknowledged message is one that has been
* sent to a consumer but not yet acknowledged. Calculated by summing all
{@link ConsumerStatsImpl#unackedMessages}
@@ -167,6 +170,8 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
msgBacklog = 0;
backlogSize = 0;
msgBacklogNoDelayed = 0;
+ msgDelayed = 0;
+ msgInReplay = 0;
unackedMessages = 0;
type = null;
msgRateExpired = 0;
@@ -202,6 +207,7 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
this.backlogSize += stats.backlogSize;
this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
this.msgDelayed += stats.msgDelayed;
+ this.msgInReplay += stats.msgInReplay;
this.unackedMessages += stats.unackedMessages;
this.type = stats.type;
this.msgRateExpired += stats.msgRateExpired;