This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e395efc Fixed deadlock by triggering asyncCreateFullPathOptimistic()
callback in different thread (#3591)
e395efc is described below
commit e395efc9129fac9c4bcacd37594735ca5414233f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Feb 13 21:36:57 2019 -0800
Fixed deadlock by triggering asyncCreateFullPathOptimistic() callback in
different thread (#3591)
---
.../bookkeeper/mledger/impl/MetaStoreImplZookeeper.java | 15 +++++++++------
.../mledger/impl/MetaStoreImplZookeeperTest.java | 15 +++++++++------
2 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 4d90b91..2e69614 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -29,6 +29,7 @@ import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -154,7 +155,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
}
};
- asyncCreateFullPathOptimistic(zk, prefixName,
ledgerName, new byte[0], Acl,
+ asyncCreateFullPathOptimistic(prefixName,
ledgerName, new byte[0], Acl,
CreateMode.PERSISTENT, createcb);
} else {
// Tried to open a managed ledger but it doesn't
exist and we shouldn't creating it at this
@@ -362,20 +363,22 @@ public class MetaStoreImplZookeeper implements MetaStore {
}
- public static void asyncCreateFullPathOptimistic(
- final ZooKeeper zk, final String basePath, final String nodePath,
final byte[] data,
+ void asyncCreateFullPathOptimistic(
+ final String basePath, final String nodePath, final byte[] data,
final List<ACL> acl, final CreateMode createMode, final
StringCallback callback) {
String fullPath = basePath + "/" + nodePath;
zk.create(fullPath, data, acl, createMode,
(rc, path, ignoreCtx1, name) -> {
Runnable retry = () -> {
- asyncCreateFullPathOptimistic(zk, basePath,
nodePath, data,
+ asyncCreateFullPathOptimistic(basePath, nodePath,
data,
acl, createMode,
callback);
};
Consumer<Integer> complete = (finalrc) -> {
- callback.processResult(finalrc, path, null, name);
+ executor.executeOrdered(nodePath, safeRun(() -> {
+ callback.processResult(finalrc, path, null,
name);
+ }));
};
if (rc != Code.NONODE.intValue()) {
@@ -403,7 +406,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
} else {
nodeParent = nodeParent.replace("\\", "/");
asyncCreateFullPathOptimistic(
- zk, basePath, nodeParent, new byte[0], acl,
CreateMode.PERSISTENT,
+ basePath, nodeParent, new byte[0], acl,
CreateMode.PERSISTENT,
(parentRc, parentPath, ignoreCtx3,
parentName) -> {
if (parentRc == Code.OK.intValue() ||
parentRc == Code.NODEEXISTS.intValue()) {
retry.run();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
index 5f2b89a..021e4c1 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
@@ -222,8 +222,10 @@ public class MetaStoreImplZookeeperTest extends
MockedBookKeeperTestCase {
@Test(timeOut = 20000)
public void createOptimisticBaseNotExist() throws Exception {
CompletableFuture<Void> promise = new CompletableFuture<>();
- MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
- zkc, "/foo", "bar/zar/gar", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+
+ MetaStoreImplZookeeper store = new MetaStoreImplZookeeper(zkc,
executor);
+ store.asyncCreateFullPathOptimistic(
+ "/foo", "bar/zar/gar", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (rc != KeeperException.Code.OK.intValue()) {
promise.completeExceptionally(KeeperException.create(rc));
@@ -241,10 +243,11 @@ public class MetaStoreImplZookeeperTest extends
MockedBookKeeperTestCase {
@Test(timeOut = 20000)
public void createOptimisticBaseExists() throws Exception {
+ MetaStoreImplZookeeper store = new MetaStoreImplZookeeper(zkc,
executor);
zkc.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
CompletableFuture<Void> promise = new CompletableFuture<>();
- MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
- zkc, "/foo", "bar/zar/gar", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+ store.asyncCreateFullPathOptimistic(
+ "/foo", "bar/zar/gar", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (rc != KeeperException.Code.OK.intValue()) {
promise.completeExceptionally(KeeperException.create(rc));
@@ -255,8 +258,8 @@ public class MetaStoreImplZookeeperTest extends
MockedBookKeeperTestCase {
promise.get();
CompletableFuture<Void> promise2 = new CompletableFuture<>();
- MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
- zkc, "/foo", "blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
+ store.asyncCreateFullPathOptimistic(
+ "/foo", "blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (rc != KeeperException.Code.OK.intValue()) {
promise2.completeExceptionally(KeeperException.create(rc));