This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e395efc  Fixed deadlock by triggering asyncCreateFullPathOptimistic() 
callback in different thread (#3591)
e395efc is described below

commit e395efc9129fac9c4bcacd37594735ca5414233f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Feb 13 21:36:57 2019 -0800

    Fixed deadlock by triggering asyncCreateFullPathOptimistic() callback in 
different thread (#3591)
---
 .../bookkeeper/mledger/impl/MetaStoreImplZookeeper.java   | 15 +++++++++------
 .../mledger/impl/MetaStoreImplZookeeperTest.java          | 15 +++++++++------
 2 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 4d90b91..2e69614 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 import java.util.function.Consumer;
 
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -154,7 +155,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
                                 }
                             };
 
-                            asyncCreateFullPathOptimistic(zk, prefixName, 
ledgerName, new byte[0], Acl,
+                            asyncCreateFullPathOptimistic(prefixName, 
ledgerName, new byte[0], Acl,
                                                           
CreateMode.PERSISTENT, createcb);
                         } else {
                             // Tried to open a managed ledger but it doesn't 
exist and we shouldn't creating it at this
@@ -362,20 +363,22 @@ public class MetaStoreImplZookeeper implements MetaStore {
 
     }
 
-    public static void asyncCreateFullPathOptimistic(
-            final ZooKeeper zk, final String basePath, final String nodePath, 
final byte[] data,
+    void asyncCreateFullPathOptimistic(
+            final String basePath, final String nodePath, final byte[] data,
             final List<ACL> acl, final CreateMode createMode, final 
StringCallback callback) {
         String fullPath = basePath + "/" + nodePath;
 
         zk.create(fullPath, data, acl, createMode,
                   (rc, path, ignoreCtx1, name) -> {
                       Runnable retry = () -> {
-                          asyncCreateFullPathOptimistic(zk, basePath, 
nodePath, data,
+                          asyncCreateFullPathOptimistic(basePath, nodePath, 
data,
                                                         acl, createMode, 
callback);
                       };
 
                       Consumer<Integer> complete = (finalrc) -> {
-                          callback.processResult(finalrc, path, null, name);
+                          executor.executeOrdered(nodePath, safeRun(() -> {
+                              callback.processResult(finalrc, path, null, 
name);
+                          }));
                       };
 
                       if (rc != Code.NONODE.intValue()) {
@@ -403,7 +406,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
                       } else {
                           nodeParent = nodeParent.replace("\\", "/");
                           asyncCreateFullPathOptimistic(
-                                  zk, basePath, nodeParent, new byte[0], acl, 
CreateMode.PERSISTENT,
+                                  basePath, nodeParent, new byte[0], acl, 
CreateMode.PERSISTENT,
                                   (parentRc, parentPath, ignoreCtx3, 
parentName) -> {
                                       if (parentRc == Code.OK.intValue() || 
parentRc == Code.NODEEXISTS.intValue()) {
                                           retry.run();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
index 5f2b89a..021e4c1 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
@@ -222,8 +222,10 @@ public class MetaStoreImplZookeeperTest extends 
MockedBookKeeperTestCase {
     @Test(timeOut = 20000)
     public void createOptimisticBaseNotExist() throws Exception {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
-                zkc, "/foo", "bar/zar/gar", new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+
+        MetaStoreImplZookeeper store = new MetaStoreImplZookeeper(zkc, 
executor);
+        store.asyncCreateFullPathOptimistic(
+                "/foo", "bar/zar/gar", new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
                 (rc, path, ctx, name) -> {
                     if (rc != KeeperException.Code.OK.intValue()) {
                         
promise.completeExceptionally(KeeperException.create(rc));
@@ -241,10 +243,11 @@ public class MetaStoreImplZookeeperTest extends 
MockedBookKeeperTestCase {
 
     @Test(timeOut = 20000)
     public void createOptimisticBaseExists() throws Exception {
+        MetaStoreImplZookeeper store = new MetaStoreImplZookeeper(zkc, 
executor);
         zkc.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
-                zkc, "/foo", "bar/zar/gar", new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+        store.asyncCreateFullPathOptimistic(
+                "/foo", "bar/zar/gar", new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
                 (rc, path, ctx, name) -> {
                     if (rc != KeeperException.Code.OK.intValue()) {
                         
promise.completeExceptionally(KeeperException.create(rc));
@@ -255,8 +258,8 @@ public class MetaStoreImplZookeeperTest extends 
MockedBookKeeperTestCase {
         promise.get();
 
         CompletableFuture<Void> promise2 = new CompletableFuture<>();
-        MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
-                zkc, "/foo", "blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT,
+        store.asyncCreateFullPathOptimistic(
+                "/foo", "blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT,
                 (rc, path, ctx, name) -> {
                     if (rc != KeeperException.Code.OK.intValue()) {
                         
promise2.completeExceptionally(KeeperException.create(rc));

Reply via email to