This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 03ea94bbea8 [improve][broker] Add msgInReplay subscription stat and
metric to improve Key_Shared observability (#23224)
03ea94bbea8 is described below
commit 03ea94bbea81494e826cf21a6718aaa4759e8428
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 29 07:39:09 2024 +0300
[improve][broker] Add msgInReplay subscription stat and metric to improve
Key_Shared observability (#23224)
(cherry picked from commit 59424a831b3ab973c607ba17e9ec6dcebd6e9ee5)
---
.../broker/service/persistent/MessageRedeliveryController.java | 9 +++++++++
.../persistent/PersistentDispatcherMultipleConsumers.java | 5 ++++-
.../pulsar/broker/service/persistent/PersistentSubscription.java | 1 +
.../pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java | 4 ++++
.../broker/stats/prometheus/AggregatedSubscriptionStats.java | 2 ++
.../pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java | 3 +++
.../org/apache/pulsar/broker/stats/prometheus/TopicStats.java | 2 ++
.../apache/pulsar/common/policies/data/SubscriptionStats.java | 3 +++
.../pulsar/common/policies/data/stats/SubscriptionStatsImpl.java | 6 ++++++
9 files changed, 34 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 63803177242..5ca052616a5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -148,4 +148,13 @@ public class MessageRedeliveryController {
public NavigableSet<PositionImpl> getMessagesToReplayNow(int
maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}
+
+ /**
+ * Get the number of messages registered for replay in the redelivery
controller.
+ *
+ * @return number of messages
+ */
+ public int size() {
+ return messagesToRedeliver.size();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6e294cc7db1..2983639c76a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -135,7 +135,6 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
-
protected enum ReadType {
Normal, Replay
}
@@ -1352,5 +1351,9 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
return subscription;
}
+ public long getNumberOfMessagesInReplay() {
+ return redeliveryMessages.size();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a4f6fdc5427..fde6a0716eb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1296,6 +1296,7 @@ public class PersistentSubscription extends
AbstractSubscription {
subStats.unackedMessages = d.getTotalUnackedMessages();
subStats.blockedSubscriptionOnUnackedMsgs =
d.isBlockedDispatcherOnUnackedMsgs();
subStats.msgDelayed = d.getNumberOfDelayedMessages();
+ subStats.msgInReplay = d.getNumberOfMessagesInReplay();
}
}
subStats.msgBacklog =
getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 3975cd89cfa..91648c27627 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -43,6 +43,7 @@ public class AggregatedNamespaceStats {
public ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
public long msgBacklog;
public long msgDelayed;
+ public long msgInReplay;
public long ongoingTxnCount;
public long abortedTxnCount;
@@ -140,10 +141,12 @@ public class AggregatedNamespaceStats {
AggregatedSubscriptionStats subsStats =
subscriptionStats.computeIfAbsent(n, k -> new
AggregatedSubscriptionStats());
msgDelayed += as.msgDelayed;
+ msgInReplay += as.msgInReplay;
subsStats.blockedSubscriptionOnUnackedMsgs =
as.blockedSubscriptionOnUnackedMsgs;
subsStats.msgBacklog += as.msgBacklog;
subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed;
subsStats.msgDelayed += as.msgDelayed;
+ subsStats.msgInReplay += as.msgInReplay;
subsStats.msgRateRedeliver += as.msgRateRedeliver;
subsStats.unackedMessages += as.unackedMessages;
subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount;
@@ -199,6 +202,7 @@ public class AggregatedNamespaceStats {
msgBacklog = 0;
msgDelayed = 0;
+ msgInReplay = 0;
ongoingTxnCount = 0;
abortedTxnCount = 0;
committedTxnCount = 0;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index da0324c5565..b713146f58b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -43,6 +43,8 @@ public class AggregatedSubscriptionStats {
public long msgDelayed;
+ public long msgInReplay;
+
long msgOutCounter;
long bytesOutCounter;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3bbc9100b36..440666c4ec9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -134,6 +134,7 @@ public class NamespaceStatsAggregator {
subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
+ subsStats.msgInReplay = subscriptionStats.msgInReplay;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog -
subsStats.msgDelayed;
@@ -420,6 +421,8 @@ public class NamespaceStatsAggregator {
writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed,
cluster, namespace);
+ writeMetric(stream, "pulsar_subscription_in_replay",
stats.msgInReplay, cluster, namespace);
+
writeMetric(stream, "pulsar_delayed_message_index_size_bytes",
stats.delayedMessageIndexSizeInBytes, cluster,
namespace);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 9eb4077225c..5d9e21113e6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -310,6 +310,8 @@ class TopicStats {
subsStats.msgBacklogNoDelayed, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
subsStats.msgDelayed, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_in_replay",
+ subsStats.msgInReplay, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream,
"pulsar_subscription_msg_rate_redeliver",
subsStats.msgRateRedeliver, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream,
"pulsar_subscription_unacked_messages",
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index d4850adaa6f..ce3a080a855 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -66,6 +66,9 @@ public interface SubscriptionStats {
/** Number of delayed messages currently being tracked. */
long getMsgDelayed();
+ /** Number of messages registered for replay. */
+ long getMsgInReplay();
+
/**
* Number of unacknowledged messages for the subscription, where an
unacknowledged message is one that has been
* sent to a consumer but not yet acknowledged. Calculated by summing all
{@link ConsumerStats#getUnackedMessages()}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index a8ea0060629..12734a5586c 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -74,6 +74,9 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
/** Number of delayed messages currently being tracked. */
public long msgDelayed;
+ /** Number of messages registered for replay. */
+ public long msgInReplay;
+
/**
* Number of unacknowledged messages for the subscription, where an
unacknowledged message is one that has been
* sent to a consumer but not yet acknowledged. Calculated by summing all
{@link ConsumerStatsImpl#unackedMessages}
@@ -167,6 +170,8 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
msgBacklog = 0;
backlogSize = 0;
msgBacklogNoDelayed = 0;
+ msgDelayed = 0;
+ msgInReplay = 0;
unackedMessages = 0;
type = null;
msgRateExpired = 0;
@@ -202,6 +207,7 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
this.backlogSize += stats.backlogSize;
this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
this.msgDelayed += stats.msgDelayed;
+ this.msgInReplay += stats.msgInReplay;
this.unackedMessages += stats.unackedMessages;
this.type = stats.type;
this.msgRateExpired += stats.msgRateExpired;