This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8a4f5cc Don't initialize /managed-ledgers on client creation (#2379) (#2509) 8a4f5cc is described below commit 8a4f5ccad08522a5774dbc0500b3f48ea73a6fb6 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Tue Sep 4 22:18:46 2018 +0200 Don't initialize /managed-ledgers on client creation (#2379) (#2509) Normally the /managed-ledgers znode is created by the initialize-cluster-metadata command when a cluster is being turned up. However, the ManagedLedger client also creates it on boot. This has caused issues in the past, where if a broker is started before initialize-cluster-metadata is run, then initialize-cluster-metadata fails because it sees the /managed-ledger znode. This patch removes the automatic creation of this znode from the client boot process. --- .../mledger/impl/MetaStoreImplZookeeper.java | 64 +++++++++++++++++++--- .../mledger/impl/MetaStoreImplZookeeperTest.java | 52 ++++++++++++++++++ .../bookkeeper/test/BookKeeperClusterTestCase.java | 4 ++ .../bookkeeper/test/MockedBookKeeperTestCase.java | 4 ++ 4 files changed, 117 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 354c04f..4d90b91 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -25,9 +25,11 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat.ParseException; +import java.io.File; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -35,7 +37,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -95,10 +96,6 @@ public class MetaStoreImplZookeeper implements MetaStore { throws Exception { this.zk = zk; this.executor = executor; - - if (zk.exists(prefixName, false) == null) { - zk.create(prefixName, new byte[0], Acl, CreateMode.PERSISTENT); - } } // @@ -157,8 +154,8 @@ public class MetaStoreImplZookeeper implements MetaStore { } }; - ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl, - CreateMode.PERSISTENT, createcb, null); + asyncCreateFullPathOptimistic(zk, prefixName, ledgerName, new byte[0], Acl, + CreateMode.PERSISTENT, createcb); } else { // Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this // point @@ -365,5 +362,58 @@ public class MetaStoreImplZookeeper implements MetaStore { } + public static void asyncCreateFullPathOptimistic( + final ZooKeeper zk, final String basePath, final String nodePath, final byte[] data, + final List<ACL> acl, final CreateMode createMode, final StringCallback callback) { + String fullPath = basePath + "/" + nodePath; + + zk.create(fullPath, data, acl, createMode, + (rc, path, ignoreCtx1, name) -> { + Runnable retry = () -> { + asyncCreateFullPathOptimistic(zk, basePath, nodePath, data, + acl, createMode, callback); + }; + + Consumer<Integer> complete = (finalrc) -> { + callback.processResult(finalrc, path, null, name); + }; + + if (rc != Code.NONODE.intValue()) { + complete.accept(rc); + return; + } + + // Since I got a nonode, it means that my parents don't exist + // create mode is persistent since ephemeral nodes can't be + // parents + String nodeParent = new File(nodePath).getParent(); + if (nodeParent == null) { + zk.exists(basePath, false, + (existsRc, existsPath, ignoreCtx2, stat) -> { + if (existsRc == Code.OK.intValue()) { + if (stat != null) { + retry.run(); + } else { + complete.accept(Code.NONODE.intValue()); + } + } else { + complete.accept(existsRc); + } + }, null); + } else { + nodeParent = nodeParent.replace("\\", "/"); + asyncCreateFullPathOptimistic( + zk, basePath, nodeParent, new byte[0], acl, CreateMode.PERSISTENT, + (parentRc, parentPath, ignoreCtx3, parentName) -> { + if (parentRc == Code.OK.intValue() || parentRc == Code.NODEEXISTS.intValue()) { + retry.run(); + } else { + complete.accept(parentRc); + } + }); + } + }, null); + } + private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java index 17b9962..5f2b89a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java @@ -18,10 +18,13 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; @@ -31,6 +34,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.ZooDefs; import org.testng.annotations.Test; @@ -214,4 +218,52 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { latch.await(); } + + @Test(timeOut = 20000) + public void createOptimisticBaseNotExist() throws Exception { + CompletableFuture<Void> promise = new CompletableFuture<>(); + MetaStoreImplZookeeper.asyncCreateFullPathOptimistic( + zkc, "/foo", "bar/zar/gar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, + (rc, path, ctx, name) -> { + if (rc != KeeperException.Code.OK.intValue()) { + promise.completeExceptionally(KeeperException.create(rc)); + } else { + promise.complete(null); + } + }); + try { + promise.get(); + fail("should have failed"); + } catch (ExecutionException ee) { + assertEquals(ee.getCause().getClass(), KeeperException.NoNodeException.class); + } + } + + @Test(timeOut = 20000) + public void createOptimisticBaseExists() throws Exception { + zkc.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + CompletableFuture<Void> promise = new CompletableFuture<>(); + MetaStoreImplZookeeper.asyncCreateFullPathOptimistic( + zkc, "/foo", "bar/zar/gar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, + (rc, path, ctx, name) -> { + if (rc != KeeperException.Code.OK.intValue()) { + promise.completeExceptionally(KeeperException.create(rc)); + } else { + promise.complete(null); + } + }); + promise.get(); + + CompletableFuture<Void> promise2 = new CompletableFuture<>(); + MetaStoreImplZookeeper.asyncCreateFullPathOptimistic( + zkc, "/foo", "blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, + (rc, path, ctx, name) -> { + if (rc != KeeperException.Code.OK.intValue()) { + promise2.completeExceptionally(KeeperException.create(rc)); + } else { + promise2.complete(null); + } + }); + promise2.get(); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 1680f71..7f90042 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -47,7 +47,9 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.AutoRecoveryMain; import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +99,8 @@ public abstract class BookKeeperClusterTestCase { startZKCluster(); // start bookkeeper service startBKCluster(); + + zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (Exception e) { LOG.error("Error setting up", e); throw e; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index e3c18a0..251a4c8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -28,7 +28,9 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.MockZooKeeper; +import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -79,6 +81,8 @@ public abstract class MockedBookKeeperTestCase { ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig(); factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf); + + zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @AfterMethod