ivankelly closed pull request #1192: BP-29 (task 3): use metadata service uri 
for constructing registration client
URL: https://github.com/apache/bookkeeper/pull/1192
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
new file mode 100644
index 000000000..938bf49f2
--- /dev/null
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import java.util.Set;
+
+/**
+ * Assertion utils.
+ */
+public final class MoreAsserts {
+
+    private MoreAsserts() {}
+
+    public static <T> void assertSetEquals(Set<T> expected, Set<T> actual) {
+        SetView<T> diff = Sets.difference(expected, actual);
+        assertTrue(
+            "Expected set contains items not exist at actual set : " + 
diff.immutableCopy(),
+            diff.isEmpty());
+        diff = Sets.difference(actual, expected);
+        assertTrue(
+            "Actual set contains items not exist at expected set : " + 
diff.immutableCopy(),
+            diff.isEmpty());
+    }
+
+}
diff --git 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
index 12b3a23af..4942e348b 100644
--- 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
@@ -166,12 +166,7 @@ public MockExecutorController 
controlScheduleAtFixedRate(ScheduledExecutorServic
            Runnable task = invocationOnMock.getArgument(0);
            long value = invocationOnMock.getArgument(1);
            TimeUnit unit = invocationOnMock.getArgument(2);
-           DeferredTask deferredTask = executor.addDelayedTask(executor, 
unit.toMillis(value), task);
-           if (value <= 0) {
-               task.run();
-               FutureUtils.complete(deferredTask.future, null);
-           }
-           return deferredTask;
+           return executor.addDelayedTask(executor, unit.toMillis(value), 
task);
        };
     }
 
@@ -192,7 +187,12 @@ private DeferredTask addDelayedTask(
             Runnable task) {
         checkArgument(delayTimeMs >= 0);
         DeferredTask deferredTask = new DeferredTask(task, delayTimeMs);
-        executor.deferredTasks.add(deferredTask);
+        if (delayTimeMs > 0) {
+            executor.deferredTasks.add(deferredTask);
+        } else {
+            task.run();
+            FutureUtils.complete(deferredTask.future, null);
+        }
         return deferredTask;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index a94f20638..d6f88e9ec 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -32,6 +32,7 @@
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
+import java.net.URI;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -58,16 +59,17 @@
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.discover.RegistrationClient;
-import org.apache.bookkeeper.discover.ZKRegistrationClient;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.CleanupLedgerManager;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -102,7 +104,7 @@
 
     static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
 
-    final RegistrationClient regClient;
+
     final EventLoopGroup eventLoopGroup;
 
     // The stats logger for this client.
@@ -142,6 +144,7 @@
     // Features
     final Feature disableEnsembleChangeFeature;
 
+    final MetadataClientDriver metadataDriver;
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManagerFactory ledgerManagerFactory;
     final LedgerManager ledgerManager;
@@ -441,21 +444,21 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, 
EventLoopGroup eventLo
         this.disableEnsembleChangeFeature =
             
this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());
 
-        // initialize registration client
+        // initialize metadata driver
         try {
-            Class<? extends RegistrationClient> regClientCls = 
conf.getRegistrationClientClass();
-            this.regClient = ReflectionUtils.newInstance(regClientCls);
-            this.regClient.initialize(
+            this.metadataDriver = MetadataDrivers.getClientDriver(
+                URI.create(conf.getMetadataServiceUri()));
+            this.metadataDriver.initialize(
                 conf,
                 scheduler,
                 statsLogger,
                 java.util.Optional.ofNullable(zkc));
         } catch (ConfigurationException ce) {
-            LOG.error("Failed to initialize registration client", ce);
-            throw new IOException("Failed to initialize registration client", 
ce);
-        } catch (BKException be) {
-            LOG.error("Failed to initialize registration client", be);
-            throw new IOException("Failed to initialize registration client", 
be);
+            LOG.error("Failed to initialize metadata client driver using 
invalid metadata service uri", ce);
+            throw new IOException("Failed to initialize metadata client 
driver", ce);
+        } catch (MetadataException me) {
+            LOG.error("Encountered metadata exceptions on initializing 
metadata client driver", me);
+            throw new IOException("Failed to initialize metadata client 
driver", me);
         }
 
         // initialize event loop group
@@ -505,7 +508,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, 
EventLoopGroup eventLo
         this.bookieClient = new BookieClient(conf, this.eventLoopGroup, 
this.mainWorkerPool,
                                              scheduler, statsLogger);
         this.bookieWatcher = new BookieWatcher(
-                conf, this.placementPolicy, regClient,
+                conf, this.placementPolicy, 
metadataDriver.getRegistrationClient(),
                 this.statsLogger.scope(WATCHER_SCOPE));
         if (conf.getDiskWeightBasedPlacementEnabled()) {
             LOG.info("Weighted ledger placement enabled");
@@ -525,9 +528,9 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, 
EventLoopGroup eventLo
         // initialize ledger manager
         try {
             this.ledgerManagerFactory =
-                AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, 
regClient.getLayoutManager());
-        } catch (IOException | InterruptedException e) {
-            throw e;
+                this.metadataDriver.getLedgerManagerFactory();
+        } catch (MetadataException e) {
+            throw new IOException("Failed to initialize ledger manager 
factory", e);
         }
         this.ledgerManager = new 
CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
         this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
@@ -549,7 +552,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, 
EventLoopGroup eventLo
         scheduler = null;
         requestTimer = null;
         reorderReadSequence = false;
-        regClient = null;
+        metadataDriver = null;
         readSpeculativeRequestPolicy = Optional.absent();
         readLACSpeculativeRequestPolicy = Optional.absent();
         placementPolicy = null;
@@ -680,8 +683,8 @@ boolean isReorderReadSequence() {
     }
 
     @VisibleForTesting
-    public RegistrationClient getRegClient() {
-        return regClient;
+    public MetadataClientDriver getMetadataClientDriver() {
+        return metadataDriver;
     }
 
     /**
@@ -731,7 +734,7 @@ public static DigestType 
fromApiDigestType(org.apache.bookkeeper.client.api.Dige
     }
 
     ZooKeeper getZkHandle() {
-        return ((ZKRegistrationClient) regClient).getZk();
+        return ((ZKMetadataClientDriver) metadataDriver).getZk();
     }
 
     protected ClientConfiguration getConf() {
@@ -1450,7 +1453,6 @@ public void close() throws BKException, 
InterruptedException {
             // which will reject any incoming metadata requests.
             ledgerManager.close();
             ledgerIdGenerator.close();
-            ledgerManagerFactory.close();
         } catch (IOException ie) {
             LOG.error("Failed to close ledger manager : ", ie);
         }
@@ -1477,7 +1479,7 @@ public void close() throws BKException, 
InterruptedException {
         if (ownEventLoopGroup) {
             eventLoopGroup.shutdownGracefully();
         }
-        this.regClient.close();
+        this.metadataDriver.close();
     }
 
     private void initOpLoggers(StatsLogger stats) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index dcface641..a71d2b4a1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -228,7 +228,10 @@ public void close() throws InterruptedException, 
BKException {
      */
     public void watchWritableBookiesChanged(final RegistrationListener 
listener)
             throws BKException {
-        bkc.regClient.watchWritableBookies(listener);
+        bkc
+            .getMetadataClientDriver()
+            .getRegistrationClient()
+            .watchWritableBookies(listener);
     }
 
     /**
@@ -240,7 +243,10 @@ public void watchWritableBookiesChanged(final 
RegistrationListener listener)
      */
     public void watchReadOnlyBookiesChanged(final RegistrationListener 
listener)
             throws BKException {
-        bkc.regClient.watchReadOnlyBookies(listener);
+        bkc
+            .getMetadataClientDriver()
+            .getRegistrationClient()
+            .watchReadOnlyBookies(listener);
     }
 
     /**
@@ -1153,7 +1159,7 @@ public static boolean format(ServerConfiguration conf,
             }
 
             BookKeeper bkc = new BookKeeper(new ClientConfiguration(conf));
-            bkc.ledgerManagerFactory.format(conf, 
bkc.regClient.getLayoutManager());
+            bkc.ledgerManagerFactory.format(conf, 
bkc.getMetadataClientDriver().getLayoutManager());
 
             return rm.format();
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
index 7639703c1..112cd4821 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
@@ -233,7 +233,10 @@ public boolean completeUnlessQueued() {
     }
 
     public void start() {
-        this.bk.regClient.watchWritableBookies(bookies -> 
availableBookiesChanged(bookies.getValue()));
+        this.bk
+            .getMetadataClientDriver()
+            .getRegistrationClient()
+            .watchWritableBookies(bookies -> 
availableBookiesChanged(bookies.getValue()));
         scheduler.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 9c90c333e..68d27ad44 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -215,6 +215,8 @@ public String getMetadataServiceUri() throws 
ConfigurationException {
                     ledgerManagerType = 
org.apache.bookkeeper.meta.FlatLedgerManagerFactory.NAME;
                 } else if (factoryClass == 
LongHierarchicalLedgerManagerFactory.class) {
                     ledgerManagerType = 
LongHierarchicalLedgerManagerFactory.NAME;
+                } else if (factoryClass == 
org.apache.bookkeeper.meta.MSLedgerManagerFactory.class) {
+                    ledgerManagerType = 
org.apache.bookkeeper.meta.MSLedgerManagerFactory.NAME;
                 } else {
                     throw new IllegalArgumentException("Unknown zookeeper 
based ledger manager factory : "
                         + factoryClass);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 51aa699b9..6511f929f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -1718,7 +1718,9 @@ public ClientConfiguration 
setNettyUsePooledBuffers(boolean enabled) {
      *
      * @param regClientClass
      *            ClientClass
+     * @deprecated since 4.7.0
      */
+    @Deprecated
     public ClientConfiguration setRegistrationClientClass(
             Class<? extends RegistrationClient> regClientClass) {
         setProperty(REGISTRATION_CLIENT_CLASS, regClientClass);
@@ -1729,7 +1731,9 @@ public ClientConfiguration setRegistrationClientClass(
      * Get Registration Client Class.
      *
      * @return registration manager class.
+     * @deprecated since 4.7.0
      */
+    @Deprecated
     public Class<? extends RegistrationClient> getRegistrationClientClass()
             throws ConfigurationException {
         return ReflectionUtils.getClass(this, REGISTRATION_CLIENT_CLASS,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
index 7b18325b1..b53fd2405 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -18,17 +18,11 @@
 
 package org.apache.bookkeeper.discover;
 
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import org.apache.bookkeeper.client.BKException;
 import 
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.meta.LayoutManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.Versioned;
 
 /**
@@ -47,22 +41,6 @@
 
     }
 
-    /**
-     * Initialize the registration client with provided resources.
-     *
-     * <p>The existence of <i>zkSupplier</i> is for backward compatability.
-     *
-     * @param conf client configuration
-     * @param statsLogger stats logger
-     * @param optionalCtx optional context is passed for initialization.
-     * @return
-     */
-    RegistrationClient initialize(ClientConfiguration conf,
-                                  ScheduledExecutorService scheduler,
-                                  StatsLogger statsLogger,
-                                  Optional<Object> optionalCtx)
-        throws BKException;
-
     @Override
     void close();
 
@@ -116,11 +94,4 @@ RegistrationClient initialize(ClientConfiguration conf,
      */
     void unwatchReadOnlyBookies(RegistrationListener listener);
 
-    /**
-     * Gets layout manager.
-     *
-     * @return the layout manager
-     */
-    LayoutManager getLayoutManager();
-
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index 237809a08..ff48a08b3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -18,14 +18,13 @@
 
 package org.apache.bookkeeper.discover;
 
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -33,33 +32,23 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import lombok.AccessLevel;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BKException.BKInterruptedException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BKException.ZKException;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.SafeRunnable;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.meta.LayoutManager;
-import org.apache.bookkeeper.meta.ZkLayoutManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Version.Occurred;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
 
 /**
  * ZooKeeper based {@link RegistrationClient}.
@@ -67,9 +56,9 @@
 @Slf4j
 public class ZKRegistrationClient implements RegistrationClient {
 
-    private static final int ZK_CONNECT_BACKOFF_MS = 200;
+    static final int ZK_CONNECT_BACKOFF_MS = 200;
 
-    private class WatchTask
+    class WatchTask
         implements SafeRunnable,
                    Watcher,
                    BiConsumer<Versioned<Set<BookieSocketAddress>>, Throwable>,
@@ -133,13 +122,15 @@ public void safeRun() {
         @Override
         public void accept(Versioned<Set<BookieSocketAddress>> bookieSet, 
Throwable throwable) {
             if (throwable != null) {
-                scheduleWatchTask(ZK_CONNECT_BACKOFF_MS);
-                firstRunFuture.completeExceptionally(throwable);
+                if (firstRunFuture.isDone()) {
+                    scheduleWatchTask(ZK_CONNECT_BACKOFF_MS);
+                } else {
+                    firstRunFuture.completeExceptionally(throwable);
+                }
                 return;
             }
 
-            if (this.version.compare(bookieSet.getVersion()) == Occurred.BEFORE
-                || this.version.compare(bookieSet.getVersion()) == 
Occurred.CONCURRENTLY) {
+            if (this.version.compare(bookieSet.getVersion()) == 
Occurred.BEFORE) {
                 this.version = bookieSet.getVersion();
                 this.bookies = bookieSet.getValue();
 
@@ -169,101 +160,45 @@ synchronized boolean isClosed() {
 
         @Override
         public synchronized void close() {
-            if (!closed) {
+            if (closed) {
                 return;
             }
+            zk.removeWatches(
+                regPath,
+                this,
+                WatcherType.Children,
+                true,
+                (rc, path, ctx) -> {},
+                null
+            );
             closed = true;
         }
     }
 
-    private ClientConfiguration conf;
-    private ZooKeeper zk = null;
-    // whether the zk handle is one we created, or is owned by whoever
-    // instantiated us
-    private boolean ownZKHandle = false;
-    private ScheduledExecutorService scheduler;
+    private final ZooKeeper zk;
+    private final ScheduledExecutorService scheduler;
+    @Getter(AccessLevel.PACKAGE)
     private WatchTask watchWritableBookiesTask = null;
+    @Getter(AccessLevel.PACKAGE)
     private WatchTask watchReadOnlyBookiesTask = null;
 
     // registration paths
-    private String bookieRegistrationPath;
-    private String bookieReadonlyRegistrationPath;
+    private final String bookieRegistrationPath;
+    private final String bookieReadonlyRegistrationPath;
 
-    // layout manager
-    private List<ACL> acls;
-    private LayoutManager layoutManager;
-
-    @Override
-    public RegistrationClient initialize(ClientConfiguration conf,
-                                         ScheduledExecutorService scheduler,
-                                         StatsLogger statsLogger,
-                                         Optional<Object> zkOptional)
-            throws BKException {
-        this.conf = conf;
+    public ZKRegistrationClient(ZooKeeper zk,
+                                String ledgersRootPath,
+                                ScheduledExecutorService scheduler) {
+        this.zk = zk;
         this.scheduler = scheduler;
 
-        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
+        this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
         this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + 
"/" + READONLY;
-
-        this.acls = ZkUtils.getACLs(conf);
-
-        // construct the zookeeper
-        if (zkOptional.isPresent()
-            && zkOptional.get() instanceof ZooKeeper) {
-            // if an external zookeeper is added, use the zookeeper instance
-            this.zk = (ZooKeeper) (zkOptional.get());
-            this.ownZKHandle = false;
-        } else {
-            try {
-                this.zk = ZooKeeperClient.newBuilder()
-                    .connectString(conf.getZkServers())
-                    .sessionTimeoutMs(conf.getZkTimeout())
-                    .operationRetryPolicy(new 
BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
-                        conf.getZkTimeout(), 0))
-                    .statsLogger(statsLogger)
-                    .build();
-
-                if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
-                    try {
-                        zk.create(bookieReadonlyRegistrationPath,
-                            new byte[0],
-                            acls,
-                            CreateMode.PERSISTENT);
-                    } catch (KeeperException.NodeExistsException e) {
-                        // this node is just now created by someone.
-                    }
-                }
-            } catch (IOException | KeeperException e) {
-                log.error("Failed to create zookeeper client to {}", 
conf.getZkServers(), e);
-                ZKException zke = new ZKException();
-                zke.fillInStackTrace();
-                throw zke;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new BKInterruptedException();
-            }
-            this.ownZKHandle = true;
-        }
-
-        // layout manager
-        this.layoutManager = new ZkLayoutManager(
-            zk,
-            conf.getZkLedgersRootPath(),
-            acls);
-
-        return this;
     }
 
     @Override
     public void close() {
-        if (ownZKHandle && null != zk) {
-            try {
-                zk.close();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.warn("Interrupted on closing zookeeper client", e);
-            }
-        }
+        // no-op
     }
 
     public ZooKeeper getZk() {
@@ -290,7 +225,7 @@ public ZooKeeper getZk() {
                 return;
             }
 
-            Version version = new LongVersion(stat.getVersion());
+            Version version = new LongVersion(stat.getCversion());
             Set<BookieSocketAddress> bookies = 
convertToBookieAddresses(children);
             future.complete(new Versioned<>(bookies, version));
         }, null);
@@ -300,9 +235,17 @@ public ZooKeeper getZk() {
 
     @Override
     public synchronized CompletableFuture<Void> 
watchWritableBookies(RegistrationListener listener) {
-        CompletableFuture<Void> f = new CompletableFuture<>();
+        CompletableFuture<Void> f;
         if (null == watchWritableBookiesTask) {
+            f = new CompletableFuture<>();
             watchWritableBookiesTask = new WatchTask(bookieRegistrationPath, 
f);
+            f = f.whenComplete((value, cause) -> {
+                if (null != cause) {
+                    unwatchWritableBookies(listener);
+                }
+            });
+        } else {
+            f = watchWritableBookiesTask.firstRunFuture;
         }
 
         watchWritableBookiesTask.addListener(listener);
@@ -327,9 +270,17 @@ public synchronized void 
unwatchWritableBookies(RegistrationListener listener) {
 
     @Override
     public synchronized CompletableFuture<Void> 
watchReadOnlyBookies(RegistrationListener listener) {
-        CompletableFuture<Void> f = new CompletableFuture<>();
+        CompletableFuture<Void> f;
         if (null == watchReadOnlyBookiesTask) {
+            f = new CompletableFuture<>();
             watchReadOnlyBookiesTask = new 
WatchTask(bookieReadonlyRegistrationPath, f);
+            f = f.whenComplete((value, cause) -> {
+                if (null != cause) {
+                    unwatchReadOnlyBookies(listener);
+                }
+            });
+        } else {
+            f = watchReadOnlyBookiesTask.firstRunFuture;
         }
 
         watchReadOnlyBookiesTask.addListener(listener);
@@ -371,14 +322,5 @@ public synchronized void 
unwatchReadOnlyBookies(RegistrationListener listener) {
         }
         return newBookieAddrs;
     }
-    @VisibleForTesting
-    public void setLayoutManager(LayoutManager layoutManager) {
-        this.layoutManager = layoutManager;
-    }
-
-    @Override
-    public LayoutManager getLayoutManager() {
-        return layoutManager;
-    }
 
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index e1c5c0c81..7f2c2d96c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -33,7 +33,6 @@
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -50,7 +49,6 @@
 import org.apache.bookkeeper.client.BKException.BKInterruptedException;
 import org.apache.bookkeeper.client.BKException.MetaStoreException;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LayoutManager;
@@ -58,7 +56,6 @@
 import org.apache.bookkeeper.meta.ZkLayoutManager;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -551,8 +548,11 @@ public boolean nukeExistingCluster() throws Exception {
 
         String availableBookiesPath = conf.getZkAvailableBookiesPath();
         boolean availableNodeExists = null != zk.exists(availableBookiesPath, 
false);
-        try (RegistrationClient regClient = new ZKRegistrationClient()) {
-            regClient.initialize(new ClientConfiguration(conf), null, 
NullStatsLogger.INSTANCE, Optional.empty());
+        try (RegistrationClient regClient = new ZKRegistrationClient(
+            zk,
+            zkLedgersRootPath,
+            null
+        )) {
             if (availableNodeExists) {
                 Collection<BookieSocketAddress> rwBookies = FutureUtils
                         .result(regClient.getWritableBookies(), 
EXCEPTION_FUNC).getValue();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index a175cc4f8..c2ff956d5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -88,6 +88,7 @@
     private static final int MS_CONNECT_BACKOFF_MS = 200;
 
     public static final int CUR_VERSION = 1;
+    public static final String NAME = "ms";
 
     public static final String TABLE_NAME = "LEDGER";
     public static final String META_FIELD = ".META";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
index 2fd302c09..b53836790 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java
@@ -64,8 +64,7 @@ MetadataClientDriver initialize(ClientConfiguration conf,
      *
      * @return the registration client used for discovering registered bookies.
      */
-    RegistrationClient getRegistrationClient()
-        throws MetadataException;
+    RegistrationClient getRegistrationClient();
 
     /**
      * Return the ledger manager factory used for accessing ledger metadata.
@@ -75,6 +74,13 @@ RegistrationClient getRegistrationClient()
     LedgerManagerFactory getLedgerManagerFactory()
         throws MetadataException;
 
+    /**
+     * Return the layout manager.
+     *
+     * @return the layout manager.
+     */
+    LayoutManager getLayoutManager();
+
     @Override
     void close();
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java
index 8a657f26f..d124700de 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java
@@ -167,19 +167,30 @@ private static void loadInitialBookieDrivers() {
      */
     public static void registerClientDriver(String metadataBackendScheme,
                                             Class<? extends 
MetadataClientDriver> driver) {
+        registerClientDriver(metadataBackendScheme, driver, false);
+    }
+
+    @VisibleForTesting
+    public static void registerClientDriver(String metadataBackendScheme,
+                                            Class<? extends 
MetadataClientDriver> driver,
+                                            boolean allowOverride) {
         if (!initialized) {
             initialize();
         }
 
         String scheme = metadataBackendScheme.toLowerCase();
         MetadataClientDriverInfo oldDriverInfo = clientDrivers.get(scheme);
-        if (null != oldDriverInfo) {
+        if (null != oldDriverInfo && !allowOverride) {
             return;
         }
         MetadataClientDriverInfo newDriverInfo = new 
MetadataClientDriverInfo(driver);
         oldDriverInfo = clientDrivers.putIfAbsent(scheme, newDriverInfo);
         if (null != oldDriverInfo) {
             log.debug("Metadata client driver for {} is already there.", 
scheme);
+            if (allowOverride) {
+                log.debug("Overriding client driver for {}", scheme);
+                clientDrivers.put(scheme, newDriverInfo);
+            }
         }
     }
 
@@ -191,19 +202,30 @@ public static void registerClientDriver(String 
metadataBackendScheme,
      */
     public static void registerBookieDriver(String metadataBackendScheme,
                                             Class<? extends 
MetadataBookieDriver> driver) {
+        registerBookieDriver(metadataBackendScheme, driver, false);
+    }
+
+    @VisibleForTesting
+    public static void registerBookieDriver(String metadataBackendScheme,
+                                            Class<? extends 
MetadataBookieDriver> driver,
+                                            boolean allowOverride) {
         if (!initialized) {
             initialize();
         }
 
         String scheme = metadataBackendScheme.toLowerCase();
         MetadataBookieDriverInfo oldDriverInfo = bookieDrivers.get(scheme);
-        if (null != oldDriverInfo) {
+        if (null != oldDriverInfo && !allowOverride) {
             return;
         }
         MetadataBookieDriverInfo newDriverInfo = new 
MetadataBookieDriverInfo(driver);
         oldDriverInfo = bookieDrivers.putIfAbsent(scheme, newDriverInfo);
         if (null != oldDriverInfo) {
             log.debug("Metadata bookie driver for {} is already there.", 
scheme);
+            if (allowOverride) {
+                log.debug("Overriding bookie driver for {}", scheme);
+                bookieDrivers.put(scheme, newDriverInfo);
+            }
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
index 3fec50cb7..9141c6ced 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
@@ -21,13 +21,11 @@
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.discover.ZKRegistrationClient;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
-import org.apache.bookkeeper.meta.exceptions.Code;
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.stats.StatsLogger;
 
@@ -66,21 +64,12 @@ public synchronized MetadataClientDriver 
initialize(ClientConfiguration conf,
     }
 
     @Override
-    public synchronized RegistrationClient getRegistrationClient() throws 
MetadataException {
+    public synchronized RegistrationClient getRegistrationClient() {
         if (null == regClient) {
-            regClient = new ZKRegistrationClient();
-            try {
-                regClient.initialize(
-                    clientConf,
-                    scheduler,
-                    statsLogger,
-                    Optional.of(zk));
-            } catch (BKException e) {
-                throw new MetadataException(
-                    Code.METADATA_SERVICE_ERROR,
-                    "Failed to initialize registration client",
-                    e);
-            }
+            regClient = new ZKRegistrationClient(
+                zk,
+                ledgersRootPath,
+                scheduler);
         }
         return regClient;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java
index f3946225d..98a018d28 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java
@@ -21,11 +21,14 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.Optional;
+import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -80,6 +83,9 @@ protected static String getZKServersFromServiceUri(URI uri) {
                 case LongHierarchicalLedgerManagerFactory.NAME:
                     ledgerManagerFactoryClass = 
LongHierarchicalLedgerManagerFactory.class;
                     break;
+                case org.apache.bookkeeper.meta.MSLedgerManagerFactory.NAME:
+                    ledgerManagerFactoryClass = 
org.apache.bookkeeper.meta.MSLedgerManagerFactory.class;
+                    break;
                 default:
                     throw new IllegalArgumentException("Unknown ledger manager 
type found '"
                         + schemeParts[1] + "' at uri : " + metadataServiceUri);
@@ -97,6 +103,7 @@ protected static String getZKServersFromServiceUri(URI uri) {
 
     // zookeeper related variables
     protected List<ACL> acls;
+    @Getter
     protected ZooKeeper zk = null;
     // whether the zk handle is one we created, or is owned by whoever
     // instantiated us
@@ -120,7 +127,7 @@ protected void initialize(AbstractConfiguration<?> conf,
                               Optional<Object> optionalCtx) throws 
MetadataException {
         this.conf = conf;
 
-        String metadataServiceUriStr;
+        final String metadataServiceUriStr;
         try {
             metadataServiceUriStr = conf.getMetadataServiceUri();
         } catch (ConfigurationException e) {
@@ -128,15 +135,19 @@ protected void initialize(AbstractConfiguration<?> conf,
             throw new MetadataException(
                 Code.INVALID_METADATA_SERVICE_URI, e);
         }
+
         this.metadataServiceUri = URI.create(metadataServiceUriStr);
         ledgerManagerFactoryClass = 
resolveLedgerManagerFactory(metadataServiceUri);
 
         // get the initialize root path
         this.ledgersRootPath = metadataServiceUri.getPath();
-        String bookieReadonlyRegistrationPath = ledgersRootPath + "/" + 
AVAILABLE_NODE;
+        final String bookieRegistrationPath = ledgersRootPath + "/" + 
AVAILABLE_NODE;
+        final String bookieReadonlyRegistrationPath = bookieRegistrationPath + 
"/" + READONLY;
 
         // construct the zookeeper
-        String zkServers = getZKServersFromServiceUri(metadataServiceUri);
+        final String zkServers = 
getZKServersFromServiceUri(metadataServiceUri);
+        log.info("Initialize zookeeper metadata driver at metadata service uri 
{} :"
+            + " zkServers = {}, ledgersRootPath = {}.", metadataServiceUriStr, 
zkServers, ledgersRootPath);
         this.acls = ZkUtils.getACLs(conf);
         if (optionalCtx.isPresent()
             && optionalCtx.get() instanceof ZooKeeper) {
@@ -156,7 +167,7 @@ protected void initialize(AbstractConfiguration<?> conf,
                 if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
                     try {
                         zk.create(bookieReadonlyRegistrationPath,
-                            new byte[0],
+                            EMPTY_BYTE_ARRAY,
                             acls,
                             CreateMode.PERSISTENT);
                     } catch (KeeperException.NodeExistsException e) {
@@ -167,7 +178,8 @@ protected void initialize(AbstractConfiguration<?> conf,
                 log.error("Failed to create zookeeper client to {}", 
zkServers, e);
                 MetadataException me = new MetadataException(
                     Code.METADATA_SERVICE_ERROR,
-                    "Failed to create zookeeper client to " + zkServers);
+                    "Failed to create zookeeper client to " + zkServers,
+                    e);
                 me.fillInStackTrace();
                 throw me;
             }
@@ -181,6 +193,10 @@ protected void initialize(AbstractConfiguration<?> conf,
             acls);
     }
 
+    public LayoutManager getLayoutManager() {
+        return layoutManager;
+    }
+
     @SneakyThrows
     public synchronized LedgerManagerFactory getLedgerManagerFactory()
             throws MetadataException {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 75b29bad0..0c22a49f8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -141,7 +141,7 @@ private void initialize(ServerConfiguration conf, ZooKeeper 
zkc)
             LedgerManagerFactory ledgerManagerFactory = 
AbstractZkLedgerManagerFactory
                     .newLedgerManagerFactory(
                         conf,
-                        bkc.getRegClient().getLayoutManager());
+                        bkc.getMetadataClientDriver().getLayoutManager());
             ledgerManager = ledgerManagerFactory.newLedgerManager();
             this.bookieLedgerIndexer = new BookieLedgerIndexer(ledgerManager);
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 8969aa69b..276ed7120 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -135,7 +135,7 @@ public ReplicationWorker(final ZooKeeper zkc,
         LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory
                 .newLedgerManagerFactory(
                     this.conf,
-                    bkc.getRegClient().getLayoutManager());
+                    bkc.getMetadataClientDriver().getLayoutManager());
         this.underreplicationManager = mFactory
                 .newLedgerUnderreplicationManager();
         this.admin = new BookKeeperAdmin(bkc, statsLogger);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommand.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommand.java
index 1ebfd88c5..b8f93a517 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommand.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommand.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.tools.cli.helpers;
 
+import java.net.URI;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -26,8 +27,9 @@
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.util.ReflectionUtils;
 
 /**
  * This is a mixin for commands that talks to discovery service.
@@ -37,21 +39,16 @@
 
     @Override
     public void run(ServerConfiguration conf) throws Exception {
-        // cast the server configuration to a client configuration object.
-        ClientConfiguration clientConf = new ClientConfiguration(conf);
-        run(clientConf);
-    }
-
-    protected void run(ClientConfiguration conf) throws Exception {
-        Class<? extends RegistrationClient> regClientCls = 
conf.getRegistrationClientClass();
+        URI metadataServiceUri = URI.create(conf.getMetadataServiceUri());
         @Cleanup("shutdown") ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
-        try (RegistrationClient regClient = 
ReflectionUtils.newInstance(regClientCls)) {
-            regClient.initialize(
-                conf,
+        try (MetadataClientDriver driver = 
MetadataDrivers.getClientDriver(metadataServiceUri)) {
+            ClientConfiguration clientConf = new ClientConfiguration(conf);
+            driver.initialize(
+                clientConf,
                 executor,
                 NullStatsLogger.INSTANCE,
                 Optional.empty());
-            run(regClient);
+            run(driver.getRegistrationClient());
         }
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index 4981159c0..253f0d909 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -26,6 +26,7 @@
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -83,7 +84,7 @@ public BookieClient getBookieClient() {
      * in the other set before completing.
      */
     private Future<?> waitForBookieInSet(BookieSocketAddress b,
-                                                       boolean writable) {
+                                                       boolean writable) 
throws Exception {
         log.info("Wait for {} to become {}",
                  b, writable ? "writable" : "readonly");
 
@@ -103,9 +104,32 @@ public BookieClient getBookieClient() {
             }
         };
 
-        regClient.watchWritableBookies(writableListener);
-        regClient.watchReadOnlyBookies(readOnlyListener);
-        return CompletableFuture.allOf(writableFuture, readOnlyFuture);
+        
getMetadataClientDriver().getRegistrationClient().watchWritableBookies(writableListener);
+        
getMetadataClientDriver().getRegistrationClient().watchReadOnlyBookies(readOnlyListener);
+
+        if (writable) {
+            return writableFuture
+                .thenCompose(ignored -> 
getMetadataClientDriver().getRegistrationClient().getReadOnlyBookies())
+                .thenCompose(readonlyBookies -> {
+                    if (readonlyBookies.getValue().contains(b)) {
+                        // if the bookie still shows up at readonly path, wait 
for it to disappear
+                        return readOnlyFuture;
+                    } else {
+                        return FutureUtils.Void();
+                    }
+                });
+        } else {
+            return readOnlyFuture
+                .thenCompose(ignored -> 
getMetadataClientDriver().getRegistrationClient().getWritableBookies())
+                .thenCompose(writableBookies -> {
+                    if (writableBookies.getValue().contains(b)) {
+                        // if the bookie still shows up at writable path, wait 
for it to disappear
+                        return writableFuture;
+                    } else {
+                        return FutureUtils.Void();
+                    }
+                });
+        }
     }
 
     public TestStatsProvider getTestStatsProvider() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 238992998..76a518021 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -46,6 +46,12 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.Code;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
+import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -58,6 +64,7 @@
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
+import org.junit.After;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -156,15 +163,71 @@ public LedgerManager newLedgerManager() {
         }
     }
 
+    static class TestMetadataClientDriver extends ZKMetadataClientDriver {
+
+        @Override
+        public synchronized LedgerManagerFactory getLedgerManagerFactory() 
throws MetadataException {
+            if (null == lmFactory) {
+                try {
+                    lmFactory = new TestLedgerManagerFactory()
+                        .initialize(conf, layoutManager, 
TestLedgerManagerFactory.CUR_VERSION);
+                } catch (IOException e) {
+                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, 
e);
+                }
+            }
+            return lmFactory;
+        }
+    }
+
+    static class TestMetadataBookieDriver extends ZKMetadataBookieDriver {
+
+        @Override
+        public synchronized LedgerManagerFactory getLedgerManagerFactory() 
throws MetadataException {
+            if (null == lmFactory) {
+                try {
+                    lmFactory = new TestLedgerManagerFactory()
+                        .initialize(conf, layoutManager, 
TestLedgerManagerFactory.CUR_VERSION);
+                } catch (IOException e) {
+                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, 
e);
+                }
+            }
+            return lmFactory;
+        }
+    }
+
     final DigestType digestType;
 
-    public ParallelLedgerRecoveryTest() {
+    public ParallelLedgerRecoveryTest() throws Exception {
         super(3);
+
+        this.digestType = DigestType.CRC32;
+    }
+
+    @Override
+    protected void startBKCluster() throws Exception {
+        MetadataDrivers.registerClientDriver("zk", 
TestMetadataClientDriver.class, true);
+        MetadataDrivers.registerBookieDriver("zk", 
TestMetadataBookieDriver.class, true);
+        baseConf.setMetadataServiceUri(
+            "zk://" + zkUtil.getZooKeeperConnectString() + 
baseConf.getZkLedgersRootPath());
         baseConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class);
+        baseClientConf.setMetadataServiceUri(
+            "zk://" + zkUtil.getZooKeeperConnectString() + 
baseConf.getZkLedgersRootPath());
         
baseClientConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class);
         baseClientConf.setReadEntryTimeout(60000);
         baseClientConf.setAddEntryTimeout(60000);
-        this.digestType = DigestType.CRC32;
+
+        super.startBKCluster();
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        try {
+            super.tearDown();
+        } finally {
+            MetadataDrivers.registerClientDriver("zk", 
ZKMetadataClientDriver.class, true);
+            MetadataDrivers.registerBookieDriver("zk", 
ZKMetadataBookieDriver.class, true);
+        }
     }
 
     @Test
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
index 70b9cb3f2..0d25ccce7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
@@ -39,6 +39,8 @@
         "zk+flat://127.0.0.1/path/to/ledgers";
     private static final String LONGHIERARCHICAL_METADATA_SERVICE_URI =
         "zk+longhierarchical://127.0.0.1/path/to/ledgers";
+    private static final String MS_METADATA_SERVICE_URI =
+        "zk+ms://127.0.0.1/path/to/ledgers";
 
     private AbstractConfiguration conf;
 
@@ -95,6 +97,16 @@ public void testLongHierarchicalLedgerManagerUri() throws 
Exception {
             conf.getMetadataServiceUri());
     }
 
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    @Test
+    public void testMsLedgerManagerUri() throws Exception {
+        conf.setLedgerManagerFactoryClass(
+            org.apache.bookkeeper.meta.MSLedgerManagerFactory.class);
+        assertEquals(
+            MS_METADATA_SERVICE_URI,
+            conf.getMetadataServiceUri());
+    }
+
     @SuppressWarnings({ "unchecked" })
     @Test(expected = IllegalArgumentException.class)
     public void testUnknownZkLedgerManagerFactory() throws Exception {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
index 385d8028b..99336b478 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
@@ -18,56 +18,521 @@
  */
 package org.apache.bookkeeper.discover;
 
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.collect;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.common.testing.MoreAsserts.assertSetEquals;
+import static 
org.apache.bookkeeper.discover.ZKRegistrationClient.ZK_CONNECT_BACKOFF_MS;
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import org.apache.bookkeeper.meta.LayoutManager;
-import org.apache.bookkeeper.meta.LedgerLayout;
+import com.google.common.collect.Lists;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException.ZKException;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
+import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.discover.ZKRegistrationClient.WatchTask;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
  * Unit test of {@link RegistrationClient}.
  */
-public class TestZkRegistrationClient {
-    private final LedgerLayout ledgerLayout;
-    private final LayoutManager layoutManager;
-    private final ZKRegistrationClient zkRegistrationClient;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ ZKRegistrationClient.class, ZkUtils.class })
+@Slf4j
+public class TestZkRegistrationClient extends MockZooKeeperTestCase {
 
-    public TestZkRegistrationClient() {
-        this.ledgerLayout = mock(LedgerLayout.class);
-        this.layoutManager = mock(LayoutManager.class);
-        this.zkRegistrationClient = new ZKRegistrationClient();
-        zkRegistrationClient.setLayoutManager(layoutManager);
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private String ledgersPath;
+    private String regPath;
+    private String regReadonlyPath;
+    private ZKRegistrationClient zkRegistrationClient;
+    private ScheduledExecutorService mockExecutor;
+    private MockExecutorController controller;
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+
+        this.ledgersPath = "/" + runtime.getMethodName();
+        this.regPath = ledgersPath + "/" + AVAILABLE_NODE;
+        this.regReadonlyPath = regPath + "/" + READONLY;
+        this.mockExecutor = mock(ScheduledExecutorService.class);
+        this.controller = new MockExecutorController()
+            .controlExecute(mockExecutor)
+            .controlSubmit(mockExecutor)
+            .controlSchedule(mockExecutor)
+            .controlScheduleAtFixedRate(mockExecutor, 10);
+        this.zkRegistrationClient = new ZKRegistrationClient(
+            mockZk,
+            ledgersPath,
+            mockExecutor
+        );
+    }
+
+    @After
+    public void teardown() {
+        if (null != zkRegistrationClient) {
+            zkRegistrationClient.close();
+        }
+    }
+
+    private static Set<BookieSocketAddress> prepareNBookies(int num) {
+        Set<BookieSocketAddress> bookies = new HashSet<>();
+        for (int i = 0; i < num; i++) {
+            bookies.add(new BookieSocketAddress("127.0.0.1", 3181 + i));
+        }
+        return bookies;
+    }
+
+    @Test
+    public void testGetWritableBookies() throws Exception {
+        Set<BookieSocketAddress> addresses = prepareNBookies(10);
+        List<String> children = Lists.newArrayList();
+        for (BookieSocketAddress address : addresses) {
+            children.add(address.toString());
+        }
+        Stat stat = mock(Stat.class);
+        when(stat.getCversion()).thenReturn(1234);
+        mockGetChildren(
+            regPath, false,
+            Code.OK.intValue(), children, stat);
+
+        Versioned<Set<BookieSocketAddress>> result =
+            result(zkRegistrationClient.getWritableBookies());
+
+        assertEquals(new LongVersion(1234), result.getVersion());
+        assertSetEquals(
+            addresses, result.getValue());
     }
 
     @Test
-    public void testGetLayoutManager() throws Exception {
-        assertEquals(layoutManager, zkRegistrationClient.getLayoutManager());
+    public void testGetReadOnlyBookies() throws Exception {
+        Set<BookieSocketAddress> addresses = prepareNBookies(10);
+        List<String> children = Lists.newArrayList();
+        for (BookieSocketAddress address : addresses) {
+            children.add(address.toString());
+        }
+        Stat stat = mock(Stat.class);
+        when(stat.getCversion()).thenReturn(1234);
+        mockGetChildren(
+            regReadonlyPath, false,
+            Code.OK.intValue(), children, stat);
+
+        Versioned<Set<BookieSocketAddress>> result =
+            result(zkRegistrationClient.getReadOnlyBookies());
+
+        assertEquals(new LongVersion(1234), result.getVersion());
+        assertSetEquals(
+            addresses, result.getValue());
     }
 
     @Test
-    public void testReadLedgerLayout() throws Exception {
-        when(layoutManager.readLedgerLayout()).thenReturn(ledgerLayout);
-        assertEquals(ledgerLayout, 
zkRegistrationClient.getLayoutManager().readLedgerLayout());
+    public void testGetWritableBookiesFailure() throws Exception {
+        mockGetChildren(
+            regPath, false,
+            Code.NONODE.intValue(), null, null);
+
+        try {
+            result(zkRegistrationClient.getWritableBookies());
+            fail("Should fail to get writable bookies");
+        } catch (ZKException zke) {
+            // expected to throw zookeeper exception
+        }
     }
 
     @Test
-    public void testStoreLedgerLayout() throws Exception {
-        
zkRegistrationClient.getLayoutManager().storeLedgerLayout(ledgerLayout);
+    public void testGetReadOnlyBookiesFailure() throws Exception {
+        mockGetChildren(
+            regReadonlyPath, false,
+            Code.NONODE.intValue(), null, null);
 
-        verify(layoutManager, times(1))
-            .storeLedgerLayout(eq(ledgerLayout));
+        try {
+            result(zkRegistrationClient.getReadOnlyBookies());
+            fail("Should fail to get writable bookies");
+        } catch (ZKException zke) {
+            // expected to throw zookeeper exception
+        }
     }
 
     @Test
-    public void testDeleteLedgerLayout() throws Exception {
-        zkRegistrationClient.getLayoutManager().deleteLedgerLayout();
+    public void testWatchWritableBookiesSuccess() throws Exception {
+        testWatchBookiesSuccess(true);
+    }
 
-        verify(layoutManager, times(1))
-            .deleteLedgerLayout();
+    @Test
+    public void testWatchReadonlyBookiesSuccess() throws Exception {
+        testWatchBookiesSuccess(false);
     }
+
+    @SuppressWarnings("unchecked")
+    private void testWatchBookiesSuccess(boolean isWritable)
+            throws Exception {
+        //
+        // 1. test watch bookies with a listener
+        //
+
+        LinkedBlockingQueue<Versioned<Set<BookieSocketAddress>>> updates =
+            spy(new LinkedBlockingQueue<>());
+        RegistrationListener listener = bookies -> {
+            try {
+                updates.put(bookies);
+            } catch (InterruptedException e) {
+                log.warn("Interrupted on enqueue bookie updates", e);
+            }
+        };
+
+        Set<BookieSocketAddress> addresses = prepareNBookies(10);
+        List<String> children = Lists.newArrayList();
+        for (BookieSocketAddress address : addresses) {
+            children.add(address.toString());
+        }
+        Stat stat = mock(Stat.class);
+        when(stat.getCversion()).thenReturn(1234);
+
+        mockGetChildren(
+            isWritable ? regPath : regReadonlyPath,
+            true,
+            Code.OK.intValue(), children, stat);
+
+        if (isWritable) {
+            result(zkRegistrationClient.watchWritableBookies(listener));
+        }  else {
+            result(zkRegistrationClient.watchReadOnlyBookies(listener));
+        }
+
+        Versioned<Set<BookieSocketAddress>> update = updates.take();
+        verify(updates, times(1)).put(any(Versioned.class));
+        assertEquals(new LongVersion(1234), update.getVersion());
+        assertSetEquals(
+            addresses, update.getValue());
+
+        verify(mockZk, times(1))
+            .getChildren(anyString(), any(Watcher.class), 
any(Children2Callback.class), any());
+
+        //
+        // 2. test watch bookies with a second listener. the second listener 
returns cached bookies
+        //    without calling `getChildren` again
+        //
+
+        // register another listener
+        LinkedBlockingQueue<Versioned<Set<BookieSocketAddress>>> secondUpdates 
=
+            spy(new LinkedBlockingQueue<>());
+        RegistrationListener secondListener = bookies -> {
+            try {
+                secondUpdates.put(bookies);
+            } catch (InterruptedException e) {
+                log.warn("Interrupted on enqueue bookie updates", e);
+            }
+        };
+        if (isWritable) {
+            result(zkRegistrationClient.watchWritableBookies(secondListener));
+        } else {
+            result(zkRegistrationClient.watchReadOnlyBookies(secondListener));
+        }
+        Versioned<Set<BookieSocketAddress>> secondListenerUpdate = 
secondUpdates.take();
+        // first listener will not be notified with any update
+        verify(updates, times(1)).put(any(Versioned.class));
+        // second listener will receive same update as the first listener 
received before
+        verify(secondUpdates, times(1)).put(any(Versioned.class));
+        assertSame(update.getVersion(), secondListenerUpdate.getVersion());
+        assertSame(update.getValue(), secondListenerUpdate.getValue());
+
+        // the second listener will return the cached value without issuing 
another getChildren call
+        verify(mockZk, times(1))
+            .getChildren(anyString(), any(Watcher.class), 
any(Children2Callback.class), any());
+
+        //
+        // 3. simulate session expire, it will trigger watcher to refetch 
bookies again.
+        //    but since there is no updates on bookies, the registered 
listeners will not be notified.
+        //
+
+        notifyWatchedEvent(
+            EventType.None,
+            KeeperState.Expired,
+            isWritable ? regPath : regReadonlyPath);
+
+        // if session expires, the watcher task will get into backoff state
+        controller.advance(Duration.ofMillis(ZK_CONNECT_BACKOFF_MS));
+
+        // the same updates returns, the getChildren calls increase to 2
+        // but since there is no updates, so no notification is sent.
+        verify(mockZk, times(2))
+            .getChildren(anyString(), any(Watcher.class), 
any(Children2Callback.class), any());
+        assertNull(updates.poll());
+        // both listener and secondListener will not receive any old update
+        verify(updates, times(1)).put(any(Versioned.class));
+        verify(secondUpdates, times(1)).put(any(Versioned.class));
+
+        //
+        // 4. notify with new bookies. both listeners will be notified with 
new bookies.
+        //
+
+        Set<BookieSocketAddress> newAddresses = prepareNBookies(20);
+        List<String> newChildren = Lists.newArrayList();
+        for (BookieSocketAddress address : newAddresses) {
+            newChildren.add(address.toString());
+        }
+        Stat newStat = mock(Stat.class);
+        when(newStat.getCversion()).thenReturn(1235);
+
+        mockGetChildren(
+            isWritable ? regPath : regReadonlyPath,
+            true,
+            Code.OK.intValue(), newChildren, newStat);
+
+        // trigger watcher
+        notifyWatchedEvent(
+            EventType.NodeChildrenChanged,
+            KeeperState.SyncConnected,
+            isWritable ? regPath : regReadonlyPath);
+
+        update = updates.take();
+        assertEquals(new LongVersion(1235), update.getVersion());
+        assertSetEquals(
+            newAddresses, update.getValue());
+        secondListenerUpdate = secondUpdates.take();
+        assertSame(update.getVersion(), secondListenerUpdate.getVersion());
+        assertSame(update.getValue(), secondListenerUpdate.getValue());
+
+        verify(mockZk, times(3))
+            .getChildren(anyString(), any(Watcher.class), 
any(Children2Callback.class), any());
+        verify(updates, times(2)).put(any(Versioned.class));
+        verify(secondUpdates, times(2)).put(any(Versioned.class));
+
+        //
+        // 5. unwatch the second listener and notify with new bookies again. 
only first listener will
+        //    be notified with new bookies.
+        //
+
+        newAddresses = prepareNBookies(25);
+        newChildren.clear();
+        newChildren = Lists.newArrayList();
+        for (BookieSocketAddress address : newAddresses) {
+            newChildren.add(address.toString());
+        }
+        newStat = mock(Stat.class);
+        when(newStat.getCversion()).thenReturn(1236);
+
+        mockGetChildren(
+            isWritable ? regPath : regReadonlyPath,
+            true,
+            Code.OK.intValue(), newChildren, newStat);
+
+        if (isWritable) {
+            assertEquals(2, 
zkRegistrationClient.getWatchWritableBookiesTask().getNumListeners());
+            zkRegistrationClient.unwatchWritableBookies(secondListener);
+            assertEquals(1, 
zkRegistrationClient.getWatchWritableBookiesTask().getNumListeners());
+        } else {
+            assertEquals(2, 
zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners());
+            zkRegistrationClient.unwatchReadOnlyBookies(secondListener);
+            assertEquals(1, 
zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners());
+        }
+        // the watch task will not be closed since there is still a listener
+        verify(mockZk, times(0))
+            .removeWatches(
+                eq(isWritable ? regPath : regReadonlyPath),
+                same(isWritable ? 
zkRegistrationClient.getWatchWritableBookiesTask()
+                    : zkRegistrationClient.getWatchReadOnlyBookiesTask()),
+                eq(WatcherType.Children),
+                eq(true),
+                any(VoidCallback.class),
+                any()
+            );
+
+        // trigger watcher
+        notifyWatchedEvent(
+            EventType.NodeChildrenChanged,
+            KeeperState.SyncConnected,
+            isWritable ? regPath : regReadonlyPath);
+
+        update = updates.take();
+        assertEquals(new LongVersion(1236), update.getVersion());
+        assertSetEquals(
+            newAddresses, update.getValue());
+        secondListenerUpdate = secondUpdates.poll();
+        assertNull(secondListenerUpdate);
+
+        verify(mockZk, times(4))
+            .getChildren(anyString(), any(Watcher.class), 
any(Children2Callback.class), any());
+        verify(updates, times(3)).put(any(Versioned.class));
+        verify(secondUpdates, times(2)).put(any(Versioned.class));
+
+        //
+        // 6. unwatch the first listener. the watch task will be closed and zk 
watcher will be removed.
+        //
+        //
+
+        WatchTask expectedWatcher;
+        if (isWritable) {
+            expectedWatcher = 
zkRegistrationClient.getWatchWritableBookiesTask();
+            assertFalse(expectedWatcher.isClosed());
+            zkRegistrationClient.unwatchWritableBookies(listener);
+            assertNull(zkRegistrationClient.getWatchWritableBookiesTask());
+        } else {
+            expectedWatcher = 
zkRegistrationClient.getWatchReadOnlyBookiesTask();
+            assertFalse(expectedWatcher.isClosed());
+            zkRegistrationClient.unwatchReadOnlyBookies(listener);
+            assertNull(zkRegistrationClient.getWatchReadOnlyBookiesTask());
+        }
+        // the watch task will not be closed since there is still a listener
+        assertTrue(expectedWatcher.isClosed());
+        verify(mockZk, times(1))
+            .removeWatches(
+                eq(isWritable ? regPath : regReadonlyPath),
+                same(expectedWatcher),
+                eq(WatcherType.Children),
+                eq(true),
+                any(VoidCallback.class),
+                any()
+            );
+    }
+
+    @Test
+    public void testWatchWritableBookiesTwice() throws Exception {
+        testWatchBookiesTwice(true);
+    }
+
+    @Test
+    public void testWatchReadonlyBookiesTwice() throws Exception {
+        testWatchBookiesTwice(false);
+    }
+
+    private void testWatchBookiesTwice(boolean isWritable)
+            throws Exception {
+        int zkCallbackDelayMs = 100;
+
+        Set<BookieSocketAddress> addresses = prepareNBookies(10);
+        List<String> children = Lists.newArrayList();
+        for (BookieSocketAddress address : addresses) {
+            children.add(address.toString());
+        }
+        Stat stat = mock(Stat.class);
+        when(stat.getCversion()).thenReturn(1234);
+
+        mockGetChildren(
+            isWritable ? regPath : regReadonlyPath,
+            true,
+            Code.OK.intValue(), children, stat, zkCallbackDelayMs);
+
+        CompletableFuture<Versioned<Set<BookieSocketAddress>>> firstResult = 
new CompletableFuture<>();
+        RegistrationListener firstListener = bookies -> 
firstResult.complete(bookies);
+
+        CompletableFuture<Versioned<Set<BookieSocketAddress>>> secondResult = 
new CompletableFuture<>();
+        RegistrationListener secondListener = bookies -> 
secondResult.complete(bookies);
+
+        List<CompletableFuture<Void>> watchFutures = 
Lists.newArrayListWithExpectedSize(2);
+        if (isWritable) {
+            
watchFutures.add(zkRegistrationClient.watchWritableBookies(firstListener));
+            
watchFutures.add(zkRegistrationClient.watchWritableBookies(secondListener));
+        }  else {
+            
watchFutures.add(zkRegistrationClient.watchReadOnlyBookies(firstListener));
+            
watchFutures.add(zkRegistrationClient.watchReadOnlyBookies(secondListener));
+        }
+
+        // trigger zkCallbackExecutor to execute getChildren callback
+        zkCallbackController.advance(Duration.ofMillis(zkCallbackDelayMs));
+
+        result(collect(watchFutures));
+        assertEquals(firstResult.get().getVersion(), 
secondResult.get().getVersion());
+        assertSetEquals(firstResult.get().getValue(), 
secondResult.get().getValue());
+    }
+
+    @Test
+    public void testWatchWritableBookiesFailure() throws Exception {
+        testWatchBookiesFailure(true);
+    }
+
+    @Test
+    public void testWatchReadonlyBookiesFailure() throws Exception {
+        testWatchBookiesFailure(false);
+    }
+
+    private void testWatchBookiesFailure(boolean isWritable)
+            throws Exception {
+        int zkCallbackDelayMs = 100;
+
+        mockGetChildren(
+            isWritable ? regPath : regReadonlyPath,
+            true,
+            Code.NONODE.intValue(), null, null, zkCallbackDelayMs);
+
+        CompletableFuture<Versioned<Set<BookieSocketAddress>>> listenerResult 
= new CompletableFuture<>();
+        RegistrationListener listener = bookies -> 
listenerResult.complete(bookies);
+
+        CompletableFuture<Void> watchFuture;
+
+        WatchTask watchTask;
+        if (isWritable) {
+            watchFuture = zkRegistrationClient.watchWritableBookies(listener);
+            watchTask = zkRegistrationClient.getWatchWritableBookiesTask();
+        } else {
+            watchFuture = zkRegistrationClient.watchReadOnlyBookies(listener);
+            watchTask = zkRegistrationClient.getWatchReadOnlyBookiesTask();
+        }
+        assertNotNull(watchTask);
+        assertEquals(1, watchTask.getNumListeners());
+
+        // trigger zkCallbackExecutor to execute getChildren callback
+        zkCallbackController.advance(Duration.ofMillis(zkCallbackDelayMs));
+
+        try {
+            result(watchFuture);
+            fail("Should fail to watch writable bookies if reg path doesn't 
exist");
+        } catch (ZKException zke) {
+            // expected
+        }
+        assertEquals(0, watchTask.getNumListeners());
+        assertTrue(watchTask.isClosed());
+        if (isWritable) {
+            assertNull(zkRegistrationClient.getWatchWritableBookiesTask());
+        } else {
+            assertNull(zkRegistrationClient.getWatchReadOnlyBookiesTask());
+        }
+    }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index deecc197a..7a28079b8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -95,7 +95,6 @@ public LedgerIdGenerator getLedgerIdGenerator() throws 
IOException {
         return Arrays.asList(new Object[][] {
             { FlatLedgerManagerFactory.class },
             { HierarchicalLedgerManagerFactory.class },
-            { LegacyHierarchicalLedgerManagerFactory.class },
             { LongHierarchicalLedgerManagerFactory.class },
             { MSLedgerManagerFactory.class },
         });
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
index bd89fb3df..90207d88c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
@@ -67,7 +67,7 @@ public String getScheme() {
         }
 
         @Override
-        public RegistrationClient getRegistrationClient() throws 
MetadataException {
+        public RegistrationClient getRegistrationClient() {
             return mock(RegistrationClient.class);
         }
 
@@ -76,6 +76,11 @@ public LedgerManagerFactory getLedgerManagerFactory() throws 
MetadataException {
             return mock(LedgerManagerFactory.class);
         }
 
+        @Override
+        public LayoutManager getLayoutManager() {
+            return mock(LayoutManager.class);
+        }
+
         @Override
         public void close() {
         }
@@ -97,7 +102,7 @@ public String getScheme() {
         }
 
         @Override
-        public RegistrationClient getRegistrationClient() throws 
MetadataException {
+        public RegistrationClient getRegistrationClient() {
             return mock(RegistrationClient.class);
         }
 
@@ -106,6 +111,11 @@ public LedgerManagerFactory getLedgerManagerFactory() 
throws MetadataException {
             return mock(LedgerManagerFactory.class);
         }
 
+        @Override
+        public LayoutManager getLayoutManager() {
+            return mock(LayoutManager.class);
+        }
+
         @Override
         public void close() {
         }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java
index 4cdf11d1e..626a05581 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java
@@ -20,8 +20,9 @@
 
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -33,6 +34,7 @@
 import org.apache.bookkeeper.discover.ZKRegistrationClient;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.ZooKeeper;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -70,7 +72,8 @@ public void testGetRegClient() throws Exception {
         ZKRegistrationClient mockRegClient = 
PowerMockito.mock(ZKRegistrationClient.class);
 
         PowerMockito.whenNew(ZKRegistrationClient.class)
-            .withNoArguments()
+            .withParameterTypes(ZooKeeper.class, String.class, 
ScheduledExecutorService.class)
+            .withArguments(any(ZooKeeper.class), anyString(), 
any(ScheduledExecutorService.class))
             .thenReturn(mockRegClient);
 
         RegistrationClient client = driver.getRegistrationClient();
@@ -78,13 +81,7 @@ public void testGetRegClient() throws Exception {
         assertSame(mockRegClient, driver.regClient);
 
         PowerMockito.verifyNew(ZKRegistrationClient.class, times(1))
-            .withNoArguments();
-        verify(mockRegClient, times(1))
-            .initialize(
-                same(conf),
-                same(mockExecutor),
-                same(NullStatsLogger.INSTANCE),
-                eq(Optional.of(driver.zk)));
+            .withArguments(eq(mockZkc), eq(ledgersRootPath), eq(mockExecutor));
 
         driver.close();
         verify(mockRegClient, times(1)).close();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseStaticTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseStaticTest.java
index 2ce4e34fd..0ed3f7d31 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseStaticTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseStaticTest.java
@@ -82,6 +82,16 @@ public void testResolveLedgerManagerFactoryFlat() {
         );
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testResolveLedgerManagerFactoryMs() {
+        assertEquals(
+            org.apache.bookkeeper.meta.MSLedgerManagerFactory.class,
+            ZKMetadataDriverBase.resolveLedgerManagerFactory(
+                URI.create("zk+ms://127.0.0.1/ledgers"))
+        );
+    }
+
     @Test
     public void testResolveLedgerManagerFactoryHierarchical() {
         assertEquals(
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java
index f1ed0c025..7a477c0ac 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.meta.zk;
 
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -77,7 +78,7 @@ public void testInitialize() throws Exception {
             driver.ledgersRootPath);
         assertTrue(driver.ownZKHandle);
 
-        String readonlyPath = "/path/to/ledgers/" + AVAILABLE_NODE;
+        String readonlyPath = "/path/to/ledgers/" + AVAILABLE_NODE + "/" + 
READONLY;
         assertSame(mockZkc, driver.zk);
         verifyStatic(ZooKeeperClient.class, times(1));
         ZooKeeperClient.newBuilder();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java
index 453769426..22d5f1b4f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java
@@ -38,12 +38,14 @@
 public abstract class ZKMetadataDriverTestBase {
 
     protected AbstractConfiguration<?> conf;
+    protected String ledgersRootPath;
     protected String metadataServiceUri;
     protected ZooKeeperClient.Builder mockZkBuilder;
     protected ZooKeeperClient mockZkc;
 
     public void setup(AbstractConfiguration<?> conf) throws Exception {
-        metadataServiceUri = "zk://127.0.0.1/path/to/ledgers";
+        ledgersRootPath = "/path/to/ledgers";
+        metadataServiceUri = "zk://127.0.0.1" + ledgersRootPath;
         this.conf = conf;
         conf.setMetadataServiceUri(metadataServiceUri);
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
index eb282296c..4190bf5c4 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
@@ -26,10 +26,15 @@
 
 import com.google.common.collect.Maps;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.api.BKException.Code;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -50,11 +55,20 @@
 
     protected final ConcurrentMap<String, Set<Watcher>> watchers = 
Maps.newConcurrentMap();
     protected ZooKeeper mockZk;
+    protected ScheduledExecutorService zkCallbackExecutor;
+    protected MockExecutorController zkCallbackController;
 
     protected void setup() throws Exception {
         this.mockZk = mock(ZooKeeper.class);
 
         PowerMockito.mockStatic(ZkUtils.class);
+
+        this.zkCallbackExecutor = mock(ScheduledExecutorService.class);
+        this.zkCallbackController = new MockExecutorController()
+            .controlExecute(zkCallbackExecutor)
+            .controlSubmit(zkCallbackExecutor)
+            .controlSchedule(zkCallbackExecutor)
+            .controlScheduleAtFixedRate(zkCallbackExecutor, 10);
     }
 
     private void addWatcher(String path, Watcher watcher) {
@@ -218,4 +232,45 @@ protected boolean notifyWatchedEvent(EventType eventType,
         return true;
     }
 
+    protected void mockGetChildren(String expectedPath,
+                                   boolean expectedWatcher,
+                                   int retCode,
+                                   List<String> retChildren,
+                                   Stat retStat) {
+        mockGetChildren(
+            expectedPath, expectedWatcher, retCode, retChildren, retStat, 0);
+    }
+
+    protected void mockGetChildren(String expectedPath,
+                                   boolean expectedWatcher,
+                                   int retCode,
+                                   List<String> retChildren,
+                                   Stat retStat,
+                                   long delayMs) {
+        doAnswer(invocationOnMock -> {
+            String p = invocationOnMock.getArgument(0);
+            Watcher w = invocationOnMock.getArgument(1);
+            Children2Callback callback = invocationOnMock.getArgument(2);
+            Object ctx = invocationOnMock.getArgument(3);
+
+            if (Code.OK == retCode) {
+                addWatcher(p, w);
+            }
+
+            this.zkCallbackExecutor.schedule(() -> callback.processResult(
+                retCode,
+                p,
+                ctx,
+                retChildren,
+                retStat
+            ), delayMs, TimeUnit.MILLISECONDS);
+            return null;
+
+        }).when(mockZk).getChildren(
+            eq(expectedPath),
+            expectedWatcher ? any(Watcher.class) : eq(null),
+            any(Children2Callback.class),
+            any());
+    }
+
 }
diff --git 
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java
 
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java
index 1d269f5ec..7d551ed55 100644
--- 
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java
+++ 
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java
@@ -19,15 +19,19 @@
 package org.apache.bookkeeper.tools.cli.helpers;
 
 import static org.mockito.Answers.CALLS_REAL_METHODS;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.spy;
 
+import java.net.URI;
 import org.apache.bookkeeper.client.api.BookKeeper;
 import org.apache.bookkeeper.client.api.BookKeeperBuilder;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.junit.Before;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
@@ -38,12 +42,13 @@
  * A test base for testing client commands.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ ClientCommand.class, BookKeeper.class })
+@PrepareForTest({ ClientCommand.class, BookKeeper.class, MetadataDrivers.class 
})
 public abstract class ClientCommandTestBase extends CommandTestBase {
 
     protected ClientConfiguration clientConf;
     protected BookKeeperBuilder mockBkBuilder;
     protected BookKeeper mockBk;
+    protected MetadataClientDriver metadataClientDriver;
 
     @Before
     public void setup() throws Exception {
@@ -60,6 +65,12 @@ public void setup() throws Exception {
             BookKeeper.class, "newBuilder", eq(clientConf))
             .thenReturn(mockBkBuilder);
         when(mockBkBuilder.build()).thenReturn(mockBk);
+
+        PowerMockito.mockStatic(MetadataDrivers.class);
+        this.metadataClientDriver = mock(MetadataClientDriver.class);
+        PowerMockito.when(
+            MetadataDrivers.getClientDriver(any(URI.class)))
+            .thenReturn(metadataClientDriver);
     }
 
 }
diff --git 
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java
 
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java
index 1677309e1..f2c6dc6e7 100644
--- 
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java
+++ 
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java
@@ -24,17 +24,19 @@
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
+import java.net.URI;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import org.apache.bookkeeper.client.api.BookKeeper;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.util.ReflectionUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -46,19 +48,20 @@
  * Unit test of {@link DiscoveryCommand}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ DiscoveryCommand.class, ReflectionUtils.class })
+@PrepareForTest({ DiscoveryCommand.class, MetadataDrivers.class })
 public class DiscoveryCommandTest {
 
     private DiscoveryCommand cmd;
     private ServerConfiguration serverConf;
     private ClientConfiguration clientConf;
     private RegistrationClient regClient;
+    private MetadataClientDriver clientDriver;
     private ScheduledExecutorService executor;
 
     @Before
     public void setup() throws Exception {
         PowerMockito.mockStatic(Executors.class);
-        PowerMockito.mockStatic(ReflectionUtils.class);
+        PowerMockito.mockStatic(MetadataDrivers.class);
 
         this.cmd = mock(DiscoveryCommand.class, CALLS_REAL_METHODS);
 
@@ -74,7 +77,10 @@ public void setup() throws Exception {
             .thenReturn(executor);
 
         this.regClient = mock(RegistrationClient.class);
-        PowerMockito.when(ReflectionUtils.newInstance(any()))
+        this.clientDriver = mock(MetadataClientDriver.class);
+        PowerMockito.when(MetadataDrivers.getClientDriver(any(URI.class)))
+            .thenReturn(clientDriver);
+        when(clientDriver.getRegistrationClient())
             .thenReturn(regClient);
     }
 
@@ -82,9 +88,10 @@ public void setup() throws Exception {
     public void testRun() throws Exception {
         cmd.run(serverConf);
         verify(cmd, times(1)).run(eq(regClient));
-        verify(regClient, times(1))
+        verify(clientDriver, times(1))
             .initialize(eq(clientConf), eq(executor), 
eq(NullStatsLogger.INSTANCE), eq(Optional.empty()));
-        verify(regClient, times(1)).close();
+        verify(clientDriver, times(1)).getRegistrationClient();
+        verify(clientDriver, times(1)).close();
         verify(executor, times(1)).shutdown();
     }
 
diff --git 
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTestBase.java
 
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTestBase.java
index 7f3588ebc..a3dfb9d71 100644
--- 
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTestBase.java
+++ 
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTestBase.java
@@ -37,7 +37,7 @@
  * A test base for discovery related commands.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ DiscoveryCommand.class, ReflectionUtils.class })
+@PrepareForTest({ DiscoveryCommand.class })
 public abstract class DiscoveryCommandTestBase extends ClientCommandTestBase {
 
     protected RegistrationClient regClient;
@@ -48,14 +48,13 @@ public void setup() throws Exception {
         super.setup();
 
         PowerMockito.mockStatic(Executors.class);
-        PowerMockito.mockStatic(ReflectionUtils.class);
 
         this.executor = mock(ScheduledExecutorService.class);
         PowerMockito.when(Executors.newSingleThreadScheduledExecutor())
             .thenReturn(executor);
 
         this.regClient = mock(RegistrationClient.class);
-        PowerMockito.when(ReflectionUtils.newInstance(any()))
+        PowerMockito.when(metadataClientDriver.getRegistrationClient())
             .thenReturn(regClient);
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to