This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4ed43c9121b1a92235effc8c534234dc99a5e67d Author: Sijie Guo <[email protected]> AuthorDate: Tue Jan 5 19:22:59 2021 -0800 Monitor if a cursor moves its mark-delete position (#8930) Motivation msgBacklog or storageSize doesn't provide a clear idea if mark-delete position is advanced or not. Add a new metric in SubscriptionStat to monitor if its mark-delete position is advanced or not. (cherry picked from commit 2fd878a8f4bfdd5cffa7fa6450714ec1ad25f514) --- .../pulsar/broker/namespace/NamespaceService.java | 2 +- .../persistent/PersistentMessageExpiryMonitor.java | 5 ++++ .../service/persistent/PersistentSubscription.java | 9 +++++++ .../prometheus/AggregatedSubscriptionStats.java | 8 ++++++ .../stats/prometheus/NamespaceStatsAggregator.java | 6 ++++- .../pulsar/broker/stats/prometheus/TopicStats.java | 8 ++++++ .../apache/pulsar/broker/admin/AdminApiTest.java | 30 +++++++++++++++++----- .../common/policies/data/SubscriptionStats.java | 4 +++ 8 files changed, 63 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 99b661e..580ca0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -241,7 +241,7 @@ public class NamespaceService { /** * Return the URL of the broker who's owning a particular service unit in asynchronous way * - * If the service unit is not owned, return a CompletableFuture with empty optional + * If the service unit is not owned, return a CompletableFuture with empty optional. */ public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) throws Exception { if (suName instanceof TopicName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 315a0a3..502dd2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; @@ -132,7 +133,11 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { public void findEntryComplete(Position position, Object ctx) { if (position != null) { log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position); + Position prevMarkDeletePos = cursor.getMarkDeletedPosition(); cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false)); + if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) { + subscription.updateLastMarkDeleteAdvancedTimestamp(); + } } else { if (log.isDebugEnabled()) { log.debug("[{}][{}] No messages to expire", topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index e75763f..2c87f02 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -93,6 +93,7 @@ public class PersistentSubscription implements Subscription { private long lastExpireTimestamp = 0L; private long lastConsumedFlowTimestamp = 0L; + private long lastMarkDeleteAdvancedTimestamp = 0L; // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; @@ -139,6 +140,11 @@ public class PersistentSubscription implements Subscription { IS_FENCED_UPDATER.set(this, FALSE); } + public void updateLastMarkDeleteAdvancedTimestamp() { + this.lastMarkDeleteAdvancedTimestamp = + Math.max(this.lastMarkDeleteAdvancedTimestamp, System.currentTimeMillis()); + } + @Override public BrokerInterceptor interceptor() { return topic.getBrokerService().getInterceptor(); @@ -334,6 +340,8 @@ public class PersistentSubscription implements Subscription { } if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) { + this.updateLastMarkDeleteAdvancedTimestamp(); + // Mark delete position advance ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache; if (snapshotCache != null) { @@ -896,6 +904,7 @@ public class PersistentSubscription implements Subscription { SubscriptionStats subStats = new SubscriptionStats(); subStats.lastExpireTimestamp = lastExpireTimestamp; subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp; + subStats.lastMarkDeleteAdvancedTimestamp = lastMarkDeleteAdvancedTimestamp; Dispatcher dispatcher = this.dispatcher; if (dispatcher != null) { Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared? diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index f3573f7..ce07180 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -47,6 +47,14 @@ public class AggregatedSubscriptionStats { long lastExpireTimestamp; + long lastConsumedFlowTimestamp; + + long lastConsumedTimestamp; + + long lastAckedTimestamp; + + long lastMarkDeleteAdvancedTimestamp; + double msgRateExpired; long totalMsgExpired; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 3e52c44..3364214 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -138,10 +138,14 @@ public class NamespaceStatsAggregator { .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); subsStats.msgBacklog = subscriptionStats.msgBacklog; subsStats.msgDelayed = subscriptionStats.msgDelayed; - subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp; subsStats.msgRateExpired = subscriptionStats.msgRateExpired; subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired; subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; + subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp; + subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp; + subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp; + subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp; subscriptionStats.consumers.forEach(cStats -> { stats.consumersCount++; subsStats.unackedMessages += cStats.unackedMessages; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index d27e863..1d39d58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -177,6 +177,14 @@ class TopicStats { subsStats.msgOutCounter); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp", subsStats.lastExpireTimestamp); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp", + subsStats.lastAckedTimestamp); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp", + subsStats.lastConsumedFlowTimestamp); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp", + subsStats.lastConsumedTimestamp); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp", + subsStats.lastMarkDeleteAdvancedTimestamp); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired", subsStats.msgRateExpired); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index ad9939c..67d2117 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2073,17 +2073,29 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2"); - assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 0); - assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10); - assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10); + SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1"); + assertEquals(subStats1.msgBacklog, 0); + assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L); + SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2"); + assertEquals(subStats2.msgBacklog, 10); + assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L); + SubscriptionStats subStats3 = topicStats.subscriptions.get("my-sub3"); + assertEquals(subStats3.msgBacklog, 10); + assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L); admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1); Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2"); - assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 0); - assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 0); - assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 0); + SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1"); + assertEquals(newSubStats1.msgBacklog, 0); + assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp); + SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2"); + assertEquals(newSubStats2.msgBacklog, 0); + assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp); + SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3"); + assertEquals(newSubStats3.msgBacklog, 0); + assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp); consumer1.close(); consumer2.close(); @@ -2523,6 +2535,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { producer.send(new byte[1024 * i * 5]); } + TopicStats topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.subscriptions.get("sub-1").lastMarkDeleteAdvancedTimestamp, 0L); + for (int i = 0; i < messages; i++) { consumer.acknowledgeCumulative(consumer.receive()); } @@ -2530,8 +2545,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { // Wait ack send Thread.sleep(1000); - TopicStats topicStats = admin.topics().getStats(topic); + topicStats = admin.topics().getStats(topic); assertEquals(topicStats.backlogSize, 0); + assertTrue(topicStats.subscriptions.get("sub-1").lastMarkDeleteAdvancedTimestamp > 0L); } @Test diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 12c5767..cd3d6d1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -89,6 +89,9 @@ public class SubscriptionStats { /** Last acked message timestamp. */ public long lastAckedTimestamp; + /** Last MarkDelete position advanced timesetamp. */ + public long lastMarkDeleteAdvancedTimestamp; + /** List of connected consumers on this subscription w/ their stats. */ public List<ConsumerStats> consumers; @@ -124,6 +127,7 @@ public class SubscriptionStats { msgRateExpired = 0; totalMsgExpired = 0; lastExpireTimestamp = 0L; + lastMarkDeleteAdvancedTimestamp = 0L; consumers.clear(); consumersAfterMarkDeletePosition.clear(); nonContiguousDeletedMessagesRanges = 0;
