This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0e9e93eb72d [fix][broker] fix pulsar-admin topics stats-internal 
caused a BK client thread a deadlock (#23258)
0e9e93eb72d is described below

commit 0e9e93eb72d48f449de0ffb363b591da80ac52e9
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 9 23:32:30 2024 +0800

    [fix][broker] fix pulsar-admin topics stats-internal caused a BK client 
thread a deadlock (#23258)
    
    (cherry picked from commit 0aaa906cd8c68a212992166221123fd83172ce31)
---
 .../broker/service/persistent/PersistentTopic.java | 29 ++++++++++++++++------
 .../pulsar/compaction/CompactedTopicImpl.java      |  8 ++++--
 2 files changed, 27 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bbfb5498e26..5b6d551ac8b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -2679,13 +2680,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         info.entries = -1;
         info.size = -1;
 
-        Optional<CompactedTopicContext> compactedTopicContext = 
getCompactedTopicContext();
-        if (compactedTopicContext.isPresent()) {
-            CompactedTopicContext ledgerContext = compactedTopicContext.get();
-            info.ledgerId = ledgerContext.getLedger().getId();
-            info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
-            info.size = ledgerContext.getLedger().getLength();
-        }
+        futures.add(getCompactedTopicContextAsync().thenAccept(v -> {
+            if (v != null) {
+                info.ledgerId = v.getLedger().getId();
+                info.entries = v.getLedger().getLastAddConfirmed() + 1;
+                info.size = v.getLedger().getLength();
+            }
+        }));
 
         stats.compactedLedger = info;
 
@@ -2804,12 +2805,24 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             if (topicCompactionService instanceof PulsarTopicCompactionService 
pulsarCompactedService) {
                 return 
pulsarCompactedService.getCompactedTopic().getCompactedTopicContext();
             }
-        } catch (ExecutionException | InterruptedException e) {
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
             log.warn("[{}]Fail to get ledger information for compacted 
topic.", topic);
         }
         return Optional.empty();
     }
 
+    public CompletableFuture<CompactedTopicContext> 
getCompactedTopicContextAsync() {
+        if (topicCompactionService instanceof PulsarTopicCompactionService 
pulsarCompactedService) {
+            CompletableFuture<CompactedTopicContext> res =
+                    
pulsarCompactedService.getCompactedTopic().getCompactedTopicContextFuture();
+            if (res == null) {
+                return CompletableFuture.completedFuture(null);
+            }
+            return res;
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
     public long getBacklogSize() {
         return ledger.getEstimatedBacklogSize();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index dfafbc41cb4..149d4691c88 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,8 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import org.apache.bookkeeper.client.BKException;
@@ -304,8 +306,10 @@ public class CompactedTopicImpl implements CompactedTopic {
      * Getter for CompactedTopicContext.
      * @return CompactedTopicContext
      */
-    public Optional<CompactedTopicContext> getCompactedTopicContext() throws 
ExecutionException, InterruptedException {
-        return compactedTopicContext == null ? Optional.empty() : 
Optional.of(compactedTopicContext.get());
+    public Optional<CompactedTopicContext> getCompactedTopicContext() throws 
ExecutionException, InterruptedException,
+            TimeoutException {
+        return compactedTopicContext == null ? Optional.empty() :
+                Optional.of(compactedTopicContext.get(30, TimeUnit.SECONDS));
     }
 
     @Override

Reply via email to