This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 86f8a844091 [fix][broker] topic policy deadlock block metadata thread.
(#23786)
86f8a844091 is described below
commit 86f8a84409116e5e42a9463189c0dedaf5cd291a
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Dec 30 23:57:27 2024 +0800
[fix][broker] topic policy deadlock block metadata thread. (#23786)
---
.../SystemTopicBasedTopicPoliciesService.java | 55 +++++++++++-----------
1 file changed, 27 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index cc3938491e6..5488d5563f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -37,6 +37,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -267,37 +270,33 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return CompletableFuture.completedFuture(Optional.empty());
}
final CompletableFuture<Boolean> preparedFuture =
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
- final var resultFuture = new
CompletableFuture<Optional<TopicPolicies>>();
- preparedFuture.thenAccept(inserted ->
policyCacheInitMap.compute(namespace, (___, existingFuture) -> {
- if (!inserted || existingFuture != null) {
- final var partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- final var policies = Optional.ofNullable(switch (type) {
- case DEFAULT ->
Optional.ofNullable(policiesCache.get(partitionedTopicName))
- .orElseGet(() ->
globalPoliciesCache.get(partitionedTopicName));
- case GLOBAL_ONLY ->
globalPoliciesCache.get(partitionedTopicName);
- case LOCAL_ONLY -> policiesCache.get(partitionedTopicName);
- });
- resultFuture.complete(policies);
- } else {
- CompletableFuture.runAsync(() -> {
- log.info("The future of {} has been removed from cache,
retry getTopicPolicies again", namespace);
- // Call it in another thread to avoid recursive update
because getTopicPoliciesAsync() could call
- // policyCacheInitMap.computeIfAbsent()
- getTopicPoliciesAsync(topicName,
type).whenComplete((result, e) -> {
- if (e == null) {
- resultFuture.complete(result);
- } else {
- resultFuture.completeExceptionally(e);
- }
+ // switch thread to avoid potential metadata thread cost and recursive
deadlock
+ return preparedFuture.thenComposeAsync(inserted -> {
+ // initialized : policies
+ final Mutable<Pair<Boolean, Optional<TopicPolicies>>>
policiesFutureHolder = new MutableObject<>();
+ // NOTICE: avoid using any callback with lock scope to avoid
deadlock
+ policyCacheInitMap.compute(namespace, (___, existingFuture) -> {
+ if (!inserted || existingFuture != null) {
+ final var partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
+ final var policies = Optional.ofNullable(switch (type) {
+ case DEFAULT ->
Optional.ofNullable(policiesCache.get(partitionedTopicName))
+ .orElseGet(() ->
globalPoliciesCache.get(partitionedTopicName));
+ case GLOBAL_ONLY ->
globalPoliciesCache.get(partitionedTopicName);
+ case LOCAL_ONLY ->
policiesCache.get(partitionedTopicName);
});
- });
+ policiesFutureHolder.setValue(Pair.of(true, policies));
+ } else {
+ policiesFutureHolder.setValue(Pair.of(false, null));
+ }
+ return existingFuture;
+ });
+ final var p = policiesFutureHolder.getValue();
+ if (!p.getLeft()) {
+ log.info("The future of {} has been removed from cache, retry
getTopicPolicies again", namespace);
+ return getTopicPoliciesAsync(topicName, type);
}
- return existingFuture;
- })).exceptionally(e -> {
- resultFuture.completeExceptionally(e);
- return null;
+ return CompletableFuture.completedFuture(p.getRight());
});
- return resultFuture;
}
public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {