This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9ea2a68cd56 [fix] [broker] Remove blocking calls from 
Subscription.getStats (#23088)
9ea2a68cd56 is described below

commit 9ea2a68cd560b08875ddcefc9a8f31536a293dcf
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 29 18:47:33 2024 +0800

    [fix] [broker] Remove blocking calls from Subscription.getStats (#23088)
    
    (cherry picked from commit e59cd05881bff11e4b127ed3496a02a0ce697fb7)
---
 .../service/persistent/PersistentSubscription.java | 53 +++++++++-----
 .../broker/service/persistent/PersistentTopic.java | 82 ++++++++++++----------
 2 files changed, 80 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index f745f95b3dc..28ebbde211e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -32,6 +32,8 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1128,8 +1130,29 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         return cursor.getEstimatedSizeSinceMarkDeletePosition();
     }
 
+    /**
+     * @deprecated please call {@link #getStatsAsync(Boolean, boolean, 
boolean)}.
+     */
+    @Deprecated
     public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean 
subscriptionBacklogSize,
                                           boolean getEarliestTimeInBacklog) {
+        // So far, there is no case hits this check.
+        if (getEarliestTimeInBacklog) {
+            throw new IllegalArgumentException("Calling the sync method 
subscription.getStats with"
+                    + " getEarliestTimeInBacklog, it may encountered a 
deadlock error.");
+        }
+        // The method "getStatsAsync" will be a sync method if the param 
"isGetEarliestTimeInBacklog" is false.
+        try {
+            return getStatsAsync(getPreciseBacklog, subscriptionBacklogSize, 
false).get(5, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            // This error will never occur.
+            throw new RuntimeException(e);
+        }
+    }
+
+    public CompletableFuture<SubscriptionStatsImpl> getStatsAsync(Boolean 
getPreciseBacklog,
+                                                                  boolean 
subscriptionBacklogSize,
+                                                                  boolean 
getEarliestTimeInBacklog) {
         SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
         subStats.lastExpireTimestamp = lastExpireTimestamp;
         subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
@@ -1200,21 +1223,6 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         } else {
             subStats.backlogSize = -1;
         }
-        if (getEarliestTimeInBacklog) {
-            if (subStats.msgBacklog > 0) {
-                ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
-                PositionImpl markDeletedPosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
-                long result = 0;
-                try {
-                    result = 
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
-                } catch (InterruptedException | ExecutionException e) {
-                    result = -1;
-                }
-                subStats.earliestMsgPublishTimeInBacklog = result;
-            } else {
-                subStats.earliestMsgPublishTimeInBacklog = -1;
-            }
-        }
         subStats.msgBacklogNoDelayed = subStats.msgBacklog - 
subStats.msgDelayed;
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
         subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
@@ -1239,7 +1247,20 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         subStats.nonContiguousDeletedMessagesRanges = 
cursor.getTotalNonContiguousDeletedMessagesRange();
         subStats.nonContiguousDeletedMessagesRangesSerializedSize =
                 cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
-        return subStats;
+        if (!getEarliestTimeInBacklog) {
+            return CompletableFuture.completedFuture(subStats);
+        }
+        if (subStats.msgBacklog > 0) {
+            ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+            PositionImpl markDeletedPosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
+            return 
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).thenApply(v
 -> {
+                subStats.earliestMsgPublishTimeInBacklog = v;
+                return subStats;
+            });
+        } else {
+            subStats.earliestMsgPublishTimeInBacklog = -1;
+            return CompletableFuture.completedFuture(subStats);
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5de1a2a331e..40b9c46ac44 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2420,7 +2420,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean 
getPreciseBacklog, boolean subscriptionBacklogSize,
                                                            boolean 
getEarliestTimeInBacklog) {
 
-        CompletableFuture<TopicStatsImpl> statsFuture = new 
CompletableFuture<>();
         TopicStatsImpl stats = new TopicStatsImpl();
 
         ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats 
= new ObjectObjectHashMap<>();
@@ -2449,29 +2448,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
         stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
 
-        subscriptions.forEach((name, subscription) -> {
-            SubscriptionStatsImpl subStats =
-                    subscription.getStats(getPreciseBacklog, 
subscriptionBacklogSize, getEarliestTimeInBacklog);
-
-            stats.msgRateOut += subStats.msgRateOut;
-            stats.msgThroughputOut += subStats.msgThroughputOut;
-            stats.bytesOutCounter += subStats.bytesOutCounter;
-            stats.msgOutCounter += subStats.msgOutCounter;
-            stats.subscriptions.put(name, subStats);
-            stats.nonContiguousDeletedMessagesRanges += 
subStats.nonContiguousDeletedMessagesRanges;
-            stats.nonContiguousDeletedMessagesRangesSerializedSize +=
-                    subStats.nonContiguousDeletedMessagesRangesSerializedSize;
-            stats.delayedMessageIndexSizeInBytes += 
subStats.delayedMessageIndexSizeInBytes;
-
-            subStats.bucketDelayedIndexStats.forEach((k, v) -> {
-                TopicMetricBean topicMetricBean =
-                        stats.bucketDelayedIndexStats.computeIfAbsent(k, __ -> 
new TopicMetricBean());
-                topicMetricBean.name = v.name;
-                topicMetricBean.labelsAndValues = v.labelsAndValues;
-                topicMetricBean.value += v.value;
-            });
-        });
-
         replicators.forEach((cluster, replicator) -> {
             ReplicatorStatsImpl replicatorStats = replicator.getStats();
 
@@ -2521,21 +2497,49 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             return compactionRecord;
         });
 
-        if (getEarliestTimeInBacklog && stats.backlogSize != 0) {
-            
ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((earliestTime, e) 
-> {
-                if (e != null) {
-                    log.error("[{}] Failed to get earliest message publish 
time in backlog", topic, e);
-                    statsFuture.completeExceptionally(e);
-                } else {
-                    stats.earliestMsgPublishTimeInBacklogs = earliestTime;
-                    statsFuture.complete(stats);
-                }
-            });
-        } else {
-            statsFuture.complete(stats);
-        }
-
-        return statsFuture;
+        Map<String, CompletableFuture<SubscriptionStatsImpl>> 
subscriptionFutures = new HashMap<>();
+        subscriptions.forEach((name, subscription) -> {
+            subscriptionFutures.put(name, 
subscription.getStatsAsync(getPreciseBacklog, subscriptionBacklogSize,
+                    getEarliestTimeInBacklog));
+        });
+        return 
FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore -> {
+            for (Map.Entry<String, CompletableFuture<SubscriptionStatsImpl>> e 
: subscriptionFutures.entrySet()) {
+                String name = e.getKey();
+                SubscriptionStatsImpl subStats = e.getValue().join();
+                stats.msgRateOut += subStats.msgRateOut;
+                stats.msgThroughputOut += subStats.msgThroughputOut;
+                stats.bytesOutCounter += subStats.bytesOutCounter;
+                stats.msgOutCounter += subStats.msgOutCounter;
+                stats.subscriptions.put(name, subStats);
+                stats.nonContiguousDeletedMessagesRanges += 
subStats.nonContiguousDeletedMessagesRanges;
+                stats.nonContiguousDeletedMessagesRangesSerializedSize +=
+                        
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
+                stats.delayedMessageIndexSizeInBytes += 
subStats.delayedMessageIndexSizeInBytes;
+
+                subStats.bucketDelayedIndexStats.forEach((k, v) -> {
+                    TopicMetricBean topicMetricBean =
+                            stats.bucketDelayedIndexStats.computeIfAbsent(k, 
__ -> new TopicMetricBean());
+                    topicMetricBean.name = v.name;
+                    topicMetricBean.labelsAndValues = v.labelsAndValues;
+                    topicMetricBean.value += v.value;
+                });
+            }
+            if (getEarliestTimeInBacklog && stats.backlogSize != 0) {
+                CompletableFuture finalRes = 
ledger.getEarliestMessagePublishTimeInBacklog()
+                    .thenApply((earliestTime) -> {
+                        stats.earliestMsgPublishTimeInBacklogs = earliestTime;
+                        return stats;
+                    });
+                // print error log.
+                finalRes.exceptionally(ex -> {
+                    log.error("[{}] Failed to get earliest message publish 
time in backlog", topic, ex);
+                    return null;
+                });
+                return finalRes;
+            } else {
+                return CompletableFuture.completedFuture(stats);
+            }
+        });
     }
 
     private Optional<CompactorMXBean> getCompactorMXBean() {

Reply via email to