This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 86f8a844091 [fix][broker] topic policy deadlock block metadata thread. 
(#23786)
86f8a844091 is described below

commit 86f8a84409116e5e42a9463189c0dedaf5cd291a
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Dec 30 23:57:27 2024 +0800

    [fix][broker] topic policy deadlock block metadata thread. (#23786)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 55 +++++++++++-----------
 1 file changed, 27 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index cc3938491e6..5488d5563f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -37,6 +37,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -267,37 +270,33 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             return CompletableFuture.completedFuture(Optional.empty());
         }
         final CompletableFuture<Boolean> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
-        final var resultFuture = new 
CompletableFuture<Optional<TopicPolicies>>();
-        preparedFuture.thenAccept(inserted -> 
policyCacheInitMap.compute(namespace, (___, existingFuture) -> {
-            if (!inserted || existingFuture != null) {
-                final var partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-                final var policies = Optional.ofNullable(switch (type) {
-                    case DEFAULT -> 
Optional.ofNullable(policiesCache.get(partitionedTopicName))
-                            .orElseGet(() -> 
globalPoliciesCache.get(partitionedTopicName));
-                    case GLOBAL_ONLY -> 
globalPoliciesCache.get(partitionedTopicName);
-                    case LOCAL_ONLY -> policiesCache.get(partitionedTopicName);
-                });
-                resultFuture.complete(policies);
-            } else {
-                CompletableFuture.runAsync(() -> {
-                    log.info("The future of {} has been removed from cache, 
retry getTopicPolicies again", namespace);
-                    // Call it in another thread to avoid recursive update 
because getTopicPoliciesAsync() could call
-                    // policyCacheInitMap.computeIfAbsent()
-                    getTopicPoliciesAsync(topicName, 
type).whenComplete((result, e) -> {
-                        if (e == null) {
-                            resultFuture.complete(result);
-                        } else {
-                            resultFuture.completeExceptionally(e);
-                        }
+        // switch thread to avoid potential metadata thread cost and recursive 
deadlock
+        return preparedFuture.thenComposeAsync(inserted -> {
+            // initialized : policies
+            final Mutable<Pair<Boolean, Optional<TopicPolicies>>> 
policiesFutureHolder = new MutableObject<>();
+            // NOTICE: avoid using any callback with lock scope to avoid 
deadlock
+            policyCacheInitMap.compute(namespace, (___, existingFuture) -> {
+                if (!inserted || existingFuture != null) {
+                    final var partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+                    final var policies = Optional.ofNullable(switch (type) {
+                        case DEFAULT -> 
Optional.ofNullable(policiesCache.get(partitionedTopicName))
+                                .orElseGet(() -> 
globalPoliciesCache.get(partitionedTopicName));
+                        case GLOBAL_ONLY -> 
globalPoliciesCache.get(partitionedTopicName);
+                        case LOCAL_ONLY -> 
policiesCache.get(partitionedTopicName);
                     });
-                });
+                    policiesFutureHolder.setValue(Pair.of(true, policies));
+                } else {
+                    policiesFutureHolder.setValue(Pair.of(false, null));
+                }
+                return existingFuture;
+            });
+            final var p = policiesFutureHolder.getValue();
+            if (!p.getLeft()) {
+                log.info("The future of {} has been removed from cache, retry 
getTopicPolicies again", namespace);
+                return getTopicPoliciesAsync(topicName, type);
             }
-            return existingFuture;
-        })).exceptionally(e -> {
-            resultFuture.completeExceptionally(e);
-            return null;
+            return CompletableFuture.completedFuture(p.getRight());
         });
-        return resultFuture;
     }
 
     public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {

Reply via email to