This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 8a1a4be7cc6 [fix][broker]Fix deadlock of metadata store (#20189)
8a1a4be7cc6 is described below
commit 8a1a4be7cc67d357e8bf71b65c01b05aeba7617c
Author: fengyubiao <[email protected]>
AuthorDate: Thu May 18 11:15:53 2023 +0800
[fix][broker]Fix deadlock of metadata store (#20189)
Motivation: This task loadOrCreatePersistentTopic occupied the event thread
of the ZK client so that other ZK tasks could not be finished anymore(Including
the task itself), and it calls bundlesCache.synchronous().get(nsname) which is
a blocking method.
Modification: Since the method getBundle(topic) will eventually call the
method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic)
instead of getBundle(topic) to avoid blocking the thread.
(cherry picked from commit 4678c36d4023a2bb8361e0a70673b96de33f06ac)
---
.../pulsar/broker/namespace/NamespaceService.java | 32 ++++++++++++----------
.../pulsar/broker/namespace/OwnershipCache.java | 6 ++--
2 files changed, 21 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index beac6a07b7f..bd52120fdcb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -37,7 +37,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -1003,26 +1005,28 @@ public class NamespaceService implements AutoCloseable {
new IllegalArgumentException("Invalid class of
NamespaceBundle: " + suName.getClass().getName()));
}
+ /**
+ * @Deprecated This method is only used in test now.
+ */
+ @Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
try {
- OwnedBundle ownedBundle =
ownershipCache.getOwnedBundle(getBundle(topicName));
- if (ownedBundle == null) {
- return false;
- }
- return ownedBundle.isActive();
- } catch (Exception e) {
- LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName,
e);
- return false;
+ return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
+ .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ LOG.warn("Unable to find OwnedBundle for topic in time - [{}]",
topicName, e);
+ throw new RuntimeException(e);
}
}
public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName
topicName) {
- Optional<CompletableFuture<OwnedBundle>> res =
ownershipCache.getOwnedBundleAsync(getBundle(topicName));
- if (!res.isPresent()) {
- return CompletableFuture.completedFuture(false);
- }
-
- return res.get().thenApply(ob -> ob != null && ob.isActive());
+ return getBundleAsync(topicName).thenCompose(bundle -> {
+ Optional<CompletableFuture<OwnedBundle>> optionalFuture =
ownershipCache.getOwnedBundleAsync(bundle);
+ if (!optionalFuture.isPresent()) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return optionalFuture.get().thenApply(ob -> ob != null &&
ob.isActive());
+ });
}
private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index cdc12876ac4..bc36ce09080 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -290,10 +290,10 @@ public class OwnershipCache {
/**
* Disable bundle in local cache and on zk.
- *
- * @param bundle
- * @throws Exception
+ * @Deprecated This is a dangerous method which is currently only used
for test, it will occupy the ZK thread.
+ * Please switch to your own thread after calling this method.
*/
+ @Deprecated
public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
return updateBundleState(bundle, false)
.thenCompose(__ -> {