This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c71f4c431b3d07baed9d1f77acf573e66dcb48c4 Author: fengyubiao <[email protected]> AuthorDate: Thu May 18 11:15:53 2023 +0800 [fix][broker]Fix deadlock of metadata store (#20189) Motivation: This task loadOrCreatePersistentTopic occupied the event thread of the ZK client so that other ZK tasks could not be finished anymore(Including the task itself), and it calls bundlesCache.synchronous().get(nsname) which is a blocking method. Modification: Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread. --- .../pulsar/broker/namespace/NamespaceService.java | 32 ++++++++++++---------- .../pulsar/broker/namespace/OwnershipCache.java | 6 ++-- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 700819e3f2a..4b2df9c5487 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -38,7 +38,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; @@ -1009,26 +1011,28 @@ public class NamespaceService implements AutoCloseable { new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName())); } + /** + * @Deprecated This method is only used in test now. + */ + @Deprecated public boolean isServiceUnitActive(TopicName topicName) { try { - OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(getBundle(topicName)); - if (ownedBundle == null) { - return false; - } - return ownedBundle.isActive(); - } catch (Exception e) { - LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName, e); - return false; + return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig() + .getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e); + throw new RuntimeException(e); } } public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) { - Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName)); - if (!res.isPresent()) { - return CompletableFuture.completedFuture(false); - } - - return res.get().thenApply(ob -> ob != null && ob.isActive()); + return getBundleAsync(topicName).thenCompose(bundle -> { + Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle); + if (!optionalFuture.isPresent()) { + return CompletableFuture.completedFuture(false); + } + return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive()); + }); } private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index a6ba5fb5d90..05e89e78803 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -295,10 +295,10 @@ public class OwnershipCache { /** * Disable bundle in local cache and on zk. - * - * @param bundle - * @throws Exception + * @Deprecated This is a dangerous method which is currently only used for test, it will occupy the ZK thread. + * Please switch to your own thread after calling this method. */ + @Deprecated public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) { return updateBundleState(bundle, false) .thenCompose(__ -> {
