This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e59cd05881b [fix] [broker] Remove blocking calls from 
Subscription.getStats (#23088)
e59cd05881b is described below

commit e59cd05881bff11e4b127ed3496a02a0ce697fb7
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 29 18:47:33 2024 +0800

    [fix] [broker] Remove blocking calls from Subscription.getStats (#23088)
---
 .../service/persistent/PersistentSubscription.java | 51 +++++++++----
 .../broker/service/persistent/PersistentTopic.java | 86 +++++++++++-----------
 2 files changed, 80 insertions(+), 57 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 77aa5f82c39..2dd890cfd29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -32,6 +32,8 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1200,7 +1202,26 @@ public class PersistentSubscription extends 
AbstractSubscription {
         return cursor.getEstimatedSizeSinceMarkDeletePosition();
     }
 
+    /**
+     * @deprecated please call {@link #getStatsAsync(GetStatsOptions)}.
+     */
+    @Deprecated
     public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
+        // So far, there is no case hits this check.
+        if (getStatsOptions.isGetEarliestTimeInBacklog()) {
+            throw new IllegalArgumentException("Calling the sync method 
subscription.getStats with"
+                    + " getEarliestTimeInBacklog, it may encountered a 
deadlock error.");
+        }
+        // The method "getStatsAsync" will be a sync method if the param 
"isGetEarliestTimeInBacklog" is false.
+        try {
+            return getStatsAsync(getStatsOptions).get(5, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            // This error will never occur.
+            throw new RuntimeException(e);
+        }
+    }
+
+    public CompletableFuture<SubscriptionStatsImpl> 
getStatsAsync(GetStatsOptions getStatsOptions) {
         SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
         subStats.lastExpireTimestamp = lastExpireTimestamp;
         subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
@@ -1273,21 +1294,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
         } else {
             subStats.backlogSize = -1;
         }
-        if (getStatsOptions.isGetEarliestTimeInBacklog()) {
-            if (subStats.msgBacklog > 0) {
-                ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
-                Position markDeletedPosition = cursor.getMarkDeletedPosition();
-                long result = 0;
-                try {
-                    result = 
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
-                } catch (InterruptedException | ExecutionException e) {
-                    result = -1;
-                }
-                subStats.earliestMsgPublishTimeInBacklog = result;
-            } else {
-                subStats.earliestMsgPublishTimeInBacklog = -1;
-            }
-        }
         subStats.msgBacklogNoDelayed = subStats.msgBacklog - 
subStats.msgDelayed;
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
         subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
@@ -1329,7 +1335,20 @@ public class PersistentSubscription extends 
AbstractSubscription {
         subStats.nonContiguousDeletedMessagesRanges = 
cursor.getTotalNonContiguousDeletedMessagesRange();
         subStats.nonContiguousDeletedMessagesRangesSerializedSize =
                 cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
-        return subStats;
+        if (!getStatsOptions.isGetEarliestTimeInBacklog()) {
+            return CompletableFuture.completedFuture(subStats);
+        }
+        if (subStats.msgBacklog > 0) {
+            ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+            Position markDeletedPosition = cursor.getMarkDeletedPosition();
+            return 
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).thenApply(v
 -> {
+                subStats.earliestMsgPublishTimeInBacklog = v;
+                return subStats;
+            });
+        } else {
+            subStats.earliestMsgPublishTimeInBacklog = -1;
+            return CompletableFuture.completedFuture(subStats);
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3587dab7755..42487d7239c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2584,7 +2584,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public CompletableFuture<? extends TopicStatsImpl> 
asyncGetStats(GetStatsOptions getStatsOptions) {
 
-        CompletableFuture<TopicStatsImpl> statsFuture = new 
CompletableFuture<>();
         TopicStatsImpl stats = new TopicStatsImpl();
 
         ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats 
= new ObjectObjectHashMap<>();
@@ -2617,32 +2616,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
         stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
 
-        subscriptions.forEach((name, subscription) -> {
-            SubscriptionStatsImpl subStats = 
subscription.getStats(getStatsOptions);
-
-            stats.msgRateOut += subStats.msgRateOut;
-            stats.msgThroughputOut += subStats.msgThroughputOut;
-            stats.bytesOutCounter += subStats.bytesOutCounter;
-            stats.msgOutCounter += subStats.msgOutCounter;
-            stats.subscriptions.put(name, subStats);
-            stats.nonContiguousDeletedMessagesRanges += 
subStats.nonContiguousDeletedMessagesRanges;
-            stats.nonContiguousDeletedMessagesRangesSerializedSize +=
-                    subStats.nonContiguousDeletedMessagesRangesSerializedSize;
-            stats.delayedMessageIndexSizeInBytes += 
subStats.delayedMessageIndexSizeInBytes;
-
-            subStats.bucketDelayedIndexStats.forEach((k, v) -> {
-                TopicMetricBean topicMetricBean =
-                        stats.bucketDelayedIndexStats.computeIfAbsent(k, __ -> 
new TopicMetricBean());
-                topicMetricBean.name = v.name;
-                topicMetricBean.labelsAndValues = v.labelsAndValues;
-                topicMetricBean.value += v.value;
-            });
-
-            if (isSystemCursor(name) || 
name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
-                stats.bytesOutInternalCounter += subStats.bytesOutCounter;
-            }
-        });
-
         replicators.forEach((cluster, replicator) -> {
             ReplicatorStatsImpl replicatorStats = replicator.computeStats();
 
@@ -2692,21 +2665,52 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             return compactionRecord;
         });
 
-        if (getStatsOptions.isGetEarliestTimeInBacklog() && stats.backlogSize 
!= 0) {
-            
ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((earliestTime, e) 
-> {
-                if (e != null) {
-                    log.error("[{}] Failed to get earliest message publish 
time in backlog", topic, e);
-                    statsFuture.completeExceptionally(e);
-                } else {
-                    stats.earliestMsgPublishTimeInBacklogs = earliestTime;
-                    statsFuture.complete(stats);
-                }
-            });
-        } else {
-            statsFuture.complete(stats);
-        }
+        Map<String, CompletableFuture<SubscriptionStatsImpl>> 
subscriptionFutures = new HashMap<>();
+        subscriptions.forEach((name, subscription) -> {
+            subscriptionFutures.put(name, 
subscription.getStatsAsync(getStatsOptions));
+        });
+        return 
FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore -> {
+            for (Map.Entry<String, CompletableFuture<SubscriptionStatsImpl>> e 
: subscriptionFutures.entrySet()) {
+                String name = e.getKey();
+                SubscriptionStatsImpl subStats = e.getValue().join();
+                stats.msgRateOut += subStats.msgRateOut;
+                stats.msgThroughputOut += subStats.msgThroughputOut;
+                stats.bytesOutCounter += subStats.bytesOutCounter;
+                stats.msgOutCounter += subStats.msgOutCounter;
+                stats.subscriptions.put(name, subStats);
+                stats.nonContiguousDeletedMessagesRanges += 
subStats.nonContiguousDeletedMessagesRanges;
+                stats.nonContiguousDeletedMessagesRangesSerializedSize +=
+                        
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
+                stats.delayedMessageIndexSizeInBytes += 
subStats.delayedMessageIndexSizeInBytes;
+
+                subStats.bucketDelayedIndexStats.forEach((k, v) -> {
+                    TopicMetricBean topicMetricBean =
+                            stats.bucketDelayedIndexStats.computeIfAbsent(k, 
ignore2 -> new TopicMetricBean());
+                    topicMetricBean.name = v.name;
+                    topicMetricBean.labelsAndValues = v.labelsAndValues;
+                    topicMetricBean.value += v.value;
+                });
 
-        return statsFuture;
+                if (isSystemCursor(name) || 
name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
+                    stats.bytesOutInternalCounter += subStats.bytesOutCounter;
+                }
+            }
+            if (getStatsOptions.isGetEarliestTimeInBacklog() && 
stats.backlogSize != 0) {
+                CompletableFuture finalRes = 
ledger.getEarliestMessagePublishTimeInBacklog()
+                    .thenApply((earliestTime) -> {
+                        stats.earliestMsgPublishTimeInBacklogs = earliestTime;
+                        return stats;
+                    });
+                // print error log.
+                finalRes.exceptionally(ex -> {
+                    log.error("[{}] Failed to get earliest message publish 
time in backlog", topic, ex);
+                    return null;
+                });
+                return finalRes;
+            } else {
+                return CompletableFuture.completedFuture(stats);
+            }
+        });
     }
 
     private Optional<CompactorMXBean> getCompactorMXBean() {

Reply via email to