This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new dd40137ee40 [fix][broker]Fix deadlock of metadata store (#20189)
dd40137ee40 is described below
commit dd40137ee401286df97607de528007eef8e82cf0
Author: fengyubiao <[email protected]>
AuthorDate: Thu May 18 11:15:53 2023 +0800
[fix][broker]Fix deadlock of metadata store (#20189)
Motivation: This task loadOrCreatePersistentTopic occupied the event thread
of the ZK client so that other ZK tasks could not be finished anymore(Including
the task itself), and it calls bundlesCache.synchronous().get(nsname) which is
a blocking method.
Modification: Since the method getBundle(topic) will eventually call the
method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic)
instead of getBundle(topic) to avoid blocking the thread.
(cherry picked from commit 4678c36d4023a2bb8361e0a70673b96de33f06ac)
---
.../pulsar/broker/namespace/NamespaceService.java | 32 ++++++++++++----------
.../pulsar/broker/namespace/OwnershipCache.java | 6 ++--
2 files changed, 21 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a0e2bf7534c..9d8d9e3890a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -38,7 +38,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -1137,16 +1139,17 @@ public class NamespaceService implements AutoCloseable {
new IllegalArgumentException("Invalid class of
NamespaceBundle: " + suName.getClass().getName()));
}
+ /**
+ * @Deprecated This method is only used in test now.
+ */
+ @Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
try {
- OwnedBundle ownedBundle =
ownershipCache.getOwnedBundle(getBundle(topicName));
- if (ownedBundle == null) {
- return false;
- }
- return ownedBundle.isActive();
- } catch (Exception e) {
- LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName,
e);
- return false;
+ return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
+ .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ LOG.warn("Unable to find OwnedBundle for topic in time - [{}]",
topicName, e);
+ throw new RuntimeException(e);
}
}
@@ -1156,12 +1159,13 @@ public class NamespaceService implements AutoCloseable {
return getBundleAsync(topicName)
.thenCompose(bundle ->
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
- Optional<CompletableFuture<OwnedBundle>> res =
ownershipCache.getOwnedBundleAsync(getBundle(topicName));
- if (!res.isPresent()) {
- return CompletableFuture.completedFuture(false);
- }
-
- return res.get().thenApply(ob -> ob != null && ob.isActive());
+ return getBundleAsync(topicName).thenCompose(bundle -> {
+ Optional<CompletableFuture<OwnedBundle>> optionalFuture =
ownershipCache.getOwnedBundleAsync(bundle);
+ if (!optionalFuture.isPresent()) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return optionalFuture.get().thenApply(ob -> ob != null &&
ob.isActive());
+ });
}
private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 7d0b5a41477..86003153714 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -288,10 +288,10 @@ public class OwnershipCache {
/**
* Disable bundle in local cache and on zk.
- *
- * @param bundle
- * @throws Exception
+ * @Deprecated This is a dangerous method which is currently only used
for test, it will occupy the ZK thread.
+ * Please switch to your own thread after calling this method.
*/
+ @Deprecated
public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
return updateBundleState(bundle, false)
.thenCompose(__ -> {