This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 0e9e93eb72d [fix][broker] fix pulsar-admin topics stats-internal
caused a BK client thread a deadlock (#23258)
0e9e93eb72d is described below
commit 0e9e93eb72d48f449de0ffb363b591da80ac52e9
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 9 23:32:30 2024 +0800
[fix][broker] fix pulsar-admin topics stats-internal caused a BK client
thread a deadlock (#23258)
(cherry picked from commit 0aaa906cd8c68a212992166221123fd83172ce31)
---
.../broker/service/persistent/PersistentTopic.java | 29 ++++++++++++++++------
.../pulsar/compaction/CompactedTopicImpl.java | 8 ++++--
2 files changed, 27 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bbfb5498e26..5b6d551ac8b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -2679,13 +2680,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
info.entries = -1;
info.size = -1;
- Optional<CompactedTopicContext> compactedTopicContext =
getCompactedTopicContext();
- if (compactedTopicContext.isPresent()) {
- CompactedTopicContext ledgerContext = compactedTopicContext.get();
- info.ledgerId = ledgerContext.getLedger().getId();
- info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
- info.size = ledgerContext.getLedger().getLength();
- }
+ futures.add(getCompactedTopicContextAsync().thenAccept(v -> {
+ if (v != null) {
+ info.ledgerId = v.getLedger().getId();
+ info.entries = v.getLedger().getLastAddConfirmed() + 1;
+ info.size = v.getLedger().getLength();
+ }
+ }));
stats.compactedLedger = info;
@@ -2804,12 +2805,24 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (topicCompactionService instanceof PulsarTopicCompactionService
pulsarCompactedService) {
return
pulsarCompactedService.getCompactedTopic().getCompactedTopicContext();
}
- } catch (ExecutionException | InterruptedException e) {
+ } catch (ExecutionException | InterruptedException | TimeoutException
e) {
log.warn("[{}]Fail to get ledger information for compacted
topic.", topic);
}
return Optional.empty();
}
+ public CompletableFuture<CompactedTopicContext>
getCompactedTopicContextAsync() {
+ if (topicCompactionService instanceof PulsarTopicCompactionService
pulsarCompactedService) {
+ CompletableFuture<CompactedTopicContext> res =
+
pulsarCompactedService.getCompactedTopic().getCompactedTopicContextFuture();
+ if (res == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return res;
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index dfafbc41cb4..149d4691c88 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,8 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
@@ -304,8 +306,10 @@ public class CompactedTopicImpl implements CompactedTopic {
* Getter for CompactedTopicContext.
* @return CompactedTopicContext
*/
- public Optional<CompactedTopicContext> getCompactedTopicContext() throws
ExecutionException, InterruptedException {
- return compactedTopicContext == null ? Optional.empty() :
Optional.of(compactedTopicContext.get());
+ public Optional<CompactedTopicContext> getCompactedTopicContext() throws
ExecutionException, InterruptedException,
+ TimeoutException {
+ return compactedTopicContext == null ? Optional.empty() :
+ Optional.of(compactedTopicContext.get(30, TimeUnit.SECONDS));
}
@Override