This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 51b93ba  ISSUE #978: decouple metaformat cmd
51b93ba is described below

commit 51b93bad7622878b1f2f3246c9880851731e84af
Author: cguttapalem <[email protected]>
AuthorDate: Wed Jan 24 10:40:44 2018 -0800

    ISSUE #978: decouple metaformat cmd
    
    Descriptions of the changes in this PR:
    
    - introduce new commands - initnewcluster, nukeexistingcluster, 
whatisinstanceid
    - add testcases initnewcluster and nukeexistingcluster at BookKeeperAdmin 
level
    
    Master Issue: #978
    
    Author: cguttapalem <[email protected]>
    
    Reviewers: Jia Zhai <None>, Sijie Guo <[email protected]>
    
    This closes #979 from reddycharan/splitmetaformat, closes #978
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 137 +++++++++++-
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  53 ++++-
 .../apache/bookkeeper/client/BookieWatcher.java    |   3 +-
 .../bookkeeper/discover/RegistrationManager.java   |  24 ++-
 .../bookkeeper/discover/ZKRegistrationManager.java | 231 ++++++++++++++-------
 .../meta/AbstractHierarchicalLedgerManager.java    |   1 -
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  33 ++-
 .../meta/AbstractZkLedgerManagerFactory.java       |  95 +++++++++
 .../apache/bookkeeper/meta/FlatLedgerManager.java  |  10 +-
 .../bookkeeper/meta/FlatLedgerManagerFactory.java  |  21 +-
 .../bookkeeper/meta/HierarchicalLedgerManager.java |   5 +
 .../meta/HierarchicalLedgerManagerFactory.java     |   4 +
 .../bookkeeper/meta/LedgerManagerFactory.java      |  16 ++
 .../meta/LegacyHierarchicalLedgerManager.java      |  14 +-
 .../LegacyHierarchicalLedgerManagerFactory.java    |  23 +-
 .../meta/LongHierarchicalLedgerManager.java        |  16 +-
 .../bookkeeper/meta/MSLedgerManagerFactory.java    |  65 +++++-
 .../org/apache/bookkeeper/util/StringUtils.java    |   8 +
 .../bookkeeper/client/BookKeeperAdminTest.java     | 206 +++++++++++++++++-
 .../bookkeeper/meta/LedgerManagerIteratorTest.java |  84 +++++++-
 20 files changed, 899 insertions(+), 150 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 82ad868..62b4992 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -140,6 +140,8 @@ public class BookieShell implements Tool {
 
     static final String CMD_METAFORMAT = "metaformat";
     static final String CMD_INITBOOKIE = "initbookie";
+    static final String CMD_INITNEWCLUSTER = "initnewcluster";
+    static final String CMD_NUKEEXISTINGCLUSTER = "nukeexistingcluster";
     static final String CMD_BOOKIEFORMAT = "bookieformat";
     static final String CMD_RECOVER = "recover";
     static final String CMD_LEDGER = "ledger";
@@ -148,6 +150,7 @@ public class BookieShell implements Tool {
     static final String CMD_LEDGERMETADATA = "ledgermetadata";
     static final String CMD_LISTUNDERREPLICATED = "listunderreplicated";
     static final String CMD_WHOISAUDITOR = "whoisauditor";
+    static final String CMD_WHATISINSTANCEID = "whatisinstanceid";
     static final String CMD_SIMPLETEST = "simpletest";
     static final String CMD_BOOKIESANITYTEST = "bookiesanity";
     static final String CMD_READLOG = "readlog";
@@ -273,6 +276,99 @@ public class BookieShell implements Tool {
     }
 
     /**
+     * Intializes new cluster by creating required znodes for the cluster. If
+     * ledgersrootpath is already existing then it will error out. If for any
+     * reason it errors out while creating znodes for the cluster, then before
+     * running initnewcluster again, try nuking existing cluster by running
+     * nukeexistingcluster. This is required because ledgersrootpath znode 
would
+     * be created after verifying that it doesn't exist, hence during next 
retry
+     * of initnewcluster it would complain saying that ledgersrootpath is
+     * already existing.
+     */
+    class InitNewCluster extends MyCommand {
+        Options opts = new Options();
+
+        InitNewCluster() {
+            super(CMD_INITNEWCLUSTER);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Initializes a new bookkeeper cluster. If initnewcluster 
fails then try nuking "
+                    + "existing cluster by running nukeexistingcluster before 
running initnewcluster again";
+        }
+
+        @Override
+        String getUsage() {
+            return "initnewcluster";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            boolean result = BookKeeperAdmin.initNewCluster(bkConf);
+            return (result) ? 0 : 1;
+        }
+    }
+
+    /**
+     * Nuke bookkeeper metadata of existing cluster in zookeeper.
+     */
+    class NukeExistingCluster extends MyCommand {
+        Options opts = new Options();
+
+        NukeExistingCluster() {
+            super(CMD_NUKEEXISTINGCLUSTER);
+            opts.addOption("p", "zkledgersrootpath", true, "zookeeper ledgers 
rootpath");
+            opts.addOption("i", "instanceid", true, "instanceid");
+            opts.addOption("f", "force", false,
+                    "If instanceid is not specified, "
+                    + "then whether to force nuke the metadata without 
validating instanceid");
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Nuke bookkeeper cluster by deleting metadata";
+        }
+
+        @Override
+        String getUsage() {
+            return "nukeexistingcluster -zkledgersrootpath <zkledgersrootpath> 
[-instanceid <instanceid> | -force]";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            boolean force = cmdLine.hasOption("f");
+            String zkledgersrootpath = 
cmdLine.getOptionValue("zkledgersrootpath");
+            String instanceid = cmdLine.getOptionValue("instanceid");
+
+            /*
+             * for NukeExistingCluster command 'zkledgersrootpath' should be 
provided and either force option or
+             * instanceid should be provided.
+             */
+            if ((zkledgersrootpath == null) || (force == (instanceid != 
null))) {
+                LOG.error(
+                        "zkledgersrootpath should be specified and either 
force option "
+                        + "or instanceid should be specified (but not both)");
+                printUsage();
+                return -1;
+            }
+
+            boolean result = BookKeeperAdmin.nukeExistingCluster(bkConf, 
zkledgersrootpath, instanceid, force);
+            return (result) ? 0 : 1;
+        }
+    }
+
+    /**
      * Formats the local data present in current bookie server.
      */
     class BookieFormatCmd extends MyCommand {
@@ -1704,6 +1800,42 @@ public class BookieShell implements Tool {
     }
 
     /**
+     * Prints the instanceid of the cluster.
+     */
+    class WhatIsInstanceId extends MyCommand {
+        Options opts = new Options();
+
+        public WhatIsInstanceId() {
+            super(CMD_WHATISINSTANCEID);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Print the instanceid of the cluster";
+        }
+
+        @Override
+        String getUsage() {
+            return "whatisinstanceid";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            try (RegistrationManager rm = 
RegistrationManager.instantiateRegistrationManager(bkConf)) {
+                String readInstanceId = rm.getClusterInstanceId();
+                LOG.info("ZKServers: {} ZkLedgersRootPath: {} InstanceId: {}", 
bkConf.getZkServers(),
+                        bkConf.getZkLedgersRootPath(), readInstanceId);
+            }
+            return 0;
+        }
+    }
+
+    /**
      * Update cookie command.
      */
     class UpdateCookieCmd extends MyCommand {
@@ -2019,7 +2151,7 @@ public class BookieShell implements Tool {
 
         @Override
         String getUsage() {
-            return "updateledger -bookieId <hostname|ip> [-updatespersec N] 
[-limit N] [-verbose true/false] "
+            return "updateledgers -bookieId <hostname|ip> [-updatespersec N] 
[-limit N] [-verbose true/false] "
                     + "[-printprogress N]";
         }
 
@@ -2562,6 +2694,8 @@ public class BookieShell implements Tool {
     {
         commands.put(CMD_METAFORMAT, new MetaFormatCmd());
         commands.put(CMD_INITBOOKIE, new InitBookieCmd());
+        commands.put(CMD_INITNEWCLUSTER, new InitNewCluster());
+        commands.put(CMD_NUKEEXISTINGCLUSTER, new NukeExistingCluster());
         commands.put(CMD_BOOKIEFORMAT, new BookieFormatCmd());
         commands.put(CMD_RECOVER, new RecoverCmd());
         commands.put(CMD_LEDGER, new LedgerCmd());
@@ -2569,6 +2703,7 @@ public class BookieShell implements Tool {
         commands.put(CMD_LISTLEDGERS, new ListLedgersCmd());
         commands.put(CMD_LISTUNDERREPLICATED, new ListUnderreplicatedCmd());
         commands.put(CMD_WHOISAUDITOR, new WhoIsAuditorCmd());
+        commands.put(CMD_WHATISINSTANCEID, new WhatIsInstanceId());
         commands.put(CMD_LEDGERMETADATA, new LedgerMetadataCmd());
         commands.put(CMD_SIMPLETEST, new SimpleTestCmd());
         commands.put(CMD_BOOKIESANITYTEST, new BookieSanityTestCmd());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 85527d6..a723884 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1127,7 +1127,7 @@ public class BookKeeperAdmin implements AutoCloseable {
             boolean isInteractive, boolean force) throws Exception {
 
         try (RegistrationManager rm = 
RegistrationManager.instantiateRegistrationManager(conf)) {
-            boolean ledgerRootExists = rm.prepareFormat(conf);
+            boolean ledgerRootExists = rm.prepareFormat();
 
             // If old data was there then confirm with admin.
             boolean doFormat = true;
@@ -1155,7 +1155,56 @@ public class BookKeeperAdmin implements AutoCloseable {
             BookKeeper bkc = new BookKeeper(new ClientConfiguration(conf));
             bkc.ledgerManagerFactory.format(conf, 
bkc.regClient.getLayoutManager());
 
-            return rm.format(conf);
+            return rm.format();
+        }
+    }
+
+    /**
+     * Intializes new cluster by creating required znodes for the cluster. If
+     * ledgersrootpath is already existing then it will error out.
+     *
+     * @param conf
+     * @return
+     * @throws Exception
+     */
+    public static boolean initNewCluster(ServerConfiguration conf) throws 
Exception {
+        try (RegistrationManager rm = 
RegistrationManager.instantiateRegistrationManager(conf)) {
+            return rm.initNewCluster();
+        }
+    }
+
+    /**
+     * Nukes existing cluster metadata. But it does only if the provided
+     * ledgersRootPath matches with configuration's zkLedgersRootPath and
+     * provided instanceid matches with the cluster metadata. If force is
+     * mentioned then instanceid will not be validated.
+     *
+     * @param conf
+     * @param ledgersRootPath
+     * @param instanceId
+     * @param force
+     * @return
+     * @throws Exception
+     */
+    public static boolean nukeExistingCluster(ServerConfiguration conf, String 
ledgersRootPath, String instanceId,
+            boolean force) throws Exception {
+        String confLedgersRootPath = conf.getZkLedgersRootPath();
+        if (!confLedgersRootPath.equals(ledgersRootPath)) {
+            LOG.error("Provided ledgerRootPath : {} is not matching with 
config's ledgerRootPath: {}, "
+                    + "so exiting nuke operation", ledgersRootPath, 
confLedgersRootPath);
+            return false;
+        }
+
+        try (RegistrationManager rm = 
RegistrationManager.instantiateRegistrationManager(conf)) {
+            if (!force) {
+                String readInstanceId = rm.getClusterInstanceId();
+                if ((instanceId == null) || 
!instanceId.equals(readInstanceId)) {
+                    LOG.error("Provided InstanceId : {} is not matching with 
cluster InstanceId in ZK: {}", instanceId,
+                            readInstanceId);
+                    return false;
+                }
+            }
+            return rm.nukeExistingCluster();
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 498a18c..5dea774 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -112,7 +112,8 @@ class BookieWatcher {
         }
     }
 
-    public Set<BookieSocketAddress> getReadOnlyBookies() throws BKException {
+    public Set<BookieSocketAddress> getReadOnlyBookies()
+            throws BKException {
         try {
             return FutureUtils.result(registrationClient.getReadOnlyBookies(), 
EXCEPTION_FUNC).getValue();
         } catch (BKInterruptedException ie) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
index d0c304b..0c3922f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
@@ -146,16 +146,32 @@ public interface RegistrationManager extends 
AutoCloseable {
     /**
      * Prepare ledgers root node, availableNode, readonly node..
      *
-     * @param conf the conf
      * @return Returns true if old data exists, false if not.
      */
-    boolean prepareFormat(ServerConfiguration conf) throws Exception;
+    boolean prepareFormat() throws Exception;
+
+    /**
+     * Initializes new cluster by creating required znodes for the cluster. If
+     * ledgersrootpath is already existing then it will error out.
+     *
+     * @return returns true if new cluster is successfully created or false if 
it failed to initialize.
+     * @throws Exception
+     */
+    boolean initNewCluster() throws Exception;
 
     /**
      * Do format boolean.
      *
-     * @param conf the conf
      * @return Returns true if success do format, false if not.
      */
-    boolean format(ServerConfiguration conf) throws Exception;
+    boolean format() throws Exception;
+
+    /**
+     * Nukes existing cluster metadata.
+     *
+     * @return returns true if cluster metadata is successfully nuked
+     *          or false if it failed to nuke the cluster metadata.
+     * @throws Exception
+     */
+    boolean nukeExistingCluster() throws Exception;
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index 527af78..b82f406 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -26,22 +26,36 @@ import static 
org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.BKInterruptedException;
+import org.apache.bookkeeper.client.BKException.MetaStoreException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LayoutManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.ZkLayoutManager;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -59,6 +73,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
@@ -69,6 +84,18 @@ import org.apache.zookeeper.data.Stat;
 @Slf4j
 public class ZKRegistrationManager implements RegistrationManager {
 
+    private static final Function<Throwable, BKException> EXCEPTION_FUNC = 
cause -> {
+        if (cause instanceof BKException) {
+            log.error("Failed to get bookie list : ", cause);
+            return (BKException) cause;
+        } else if (cause instanceof InterruptedException) {
+            log.error("Interrupted reading bookie list : ", cause);
+            return new BKInterruptedException();
+        } else {
+            return new MetaStoreException();
+        }
+    };
+
     private ServerConfiguration conf;
     private ZooKeeper zk;
     private List<ACL> zkAcls;
@@ -421,97 +448,153 @@ public class ZKRegistrationManager implements 
RegistrationManager {
     }
 
     @Override
-    public boolean prepareFormat(ServerConfiguration conf) throws Exception {
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
-            .connectString(conf.getZkServers())
-            .sessionTimeoutMs(conf.getZkTimeout())
-            .build()) {
-
-            log.info("Formatting ZooKeeper metadata, ledger root path: {}", 
conf.getZkLedgersRootPath());
-            boolean ledgerRootExists = null != zk.exists(
-                conf.getZkLedgersRootPath(), false);
-            boolean availableNodeExists = null != zk.exists(
-                conf.getZkAvailableBookiesPath(), false);
-            List<ACL> zkAcls = ZkUtils.getACLs(conf);
-            // Create ledgers root node if not exists
-            if (!ledgerRootExists) {
-                zk.create(conf.getZkLedgersRootPath(), 
"".getBytes(Charsets.UTF_8),
-                    zkAcls, CreateMode.PERSISTENT);
-            }
-            // create available bookies node if not exists
-            if (!availableNodeExists) {
-                zk.create(conf.getZkAvailableBookiesPath(), 
"".getBytes(Charsets.UTF_8),
-                    zkAcls, CreateMode.PERSISTENT);
-            }
+    public boolean prepareFormat() throws Exception {
+        boolean ledgerRootExists = null != 
zk.exists(conf.getZkLedgersRootPath(), false);
+        boolean availableNodeExists = null != 
zk.exists(conf.getZkAvailableBookiesPath(), false);
+        List<ACL> zkAcls = ZkUtils.getACLs(conf);
+        // Create ledgers root node if not exists
+        if (!ledgerRootExists) {
+            zk.create(conf.getZkLedgersRootPath(), 
"".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
+        }
+        // create available bookies node if not exists
+        if (!availableNodeExists) {
+            zk.create(conf.getZkAvailableBookiesPath(), 
"".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
+        }
 
-            // create readonly bookies node if not exists
-            if (null == zk.exists(conf.getZkAvailableBookiesPath() + "/" + 
READONLY, false)) {
-                zk.create(
-                    conf.getZkAvailableBookiesPath() + "/" + READONLY,
-                    new byte[0],
-                    zkAcls,
-                    CreateMode.PERSISTENT);
-            }
+        // create readonly bookies node if not exists
+        if (null == zk.exists(conf.getZkAvailableBookiesPath() + "/" + 
READONLY, false)) {
+            zk.create(conf.getZkAvailableBookiesPath() + "/" + READONLY, new 
byte[0], zkAcls, CreateMode.PERSISTENT);
+        }
+
+        return ledgerRootExists;
+    }
 
-            return ledgerRootExists;
+    @Override
+    public boolean initNewCluster() throws Exception {
+        String zkLedgersRootPath = conf.getZkLedgersRootPath();
+        String zkServers = conf.getZkServers();
+        String zkAvailableBookiesPath = conf.getZkAvailableBookiesPath();
+        log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: 
{} ledger root path: {}", zkServers,
+                zkLedgersRootPath);
+
+        boolean ledgerRootExists = null != 
zk.exists(conf.getZkLedgersRootPath(), false);
+
+        if (ledgerRootExists) {
+            log.error("Ledger root path: {} already exists", 
conf.getZkLedgersRootPath());
+            return false;
         }
+
+        // Create ledgers root node
+        zk.create(zkLedgersRootPath, "".getBytes(UTF_8), Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+
+        // create available bookies node
+        zk.create(zkAvailableBookiesPath, "".getBytes(UTF_8), 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        // creates the new layout and stores in zookeeper
+        LedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
+
+        // create INSTANCEID
+        String instanceId = UUID.randomUUID().toString();
+        zk.create(conf.getZkLedgersRootPath() + "/" + 
BookKeeperConstants.INSTANCEID, instanceId.getBytes(UTF_8),
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        log.info("Successfully initiated cluster. ZKServers: {} ledger root 
path: {} instanceId: {}", zkServers,
+                zkLedgersRootPath, instanceId);
+        return true;
     }
 
     @Override
-    public boolean format(ServerConfiguration conf) throws Exception {
-        // Clear underreplicated ledgers
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
-            .connectString(conf.getZkServers())
-            .sessionTimeoutMs(conf.getZkTimeout())
-            .build()) {
-            try {
-                ZKUtil.deleteRecursive(zk, 
ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath())
-                    + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH);
-            } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("underreplicated ledgers root path node not 
exists in zookeeper to delete");
+    public boolean nukeExistingCluster() throws Exception {
+        String zkLedgersRootPath = conf.getZkLedgersRootPath();
+        String zkServers = conf.getZkServers();
+        log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} 
ledger root path: {}",
+                zkServers, zkLedgersRootPath);
+
+        boolean ledgerRootExists = null != 
zk.exists(conf.getZkLedgersRootPath(), false);
+        if (!ledgerRootExists) {
+            log.info("There is no existing cluster with ledgersRootPath: {} in 
ZKServers: {}, "
+                    + "so exiting nuke operation", zkLedgersRootPath, 
conf.getZkServers());
+            return true;
+        }
+
+        String availableBookiesPath = conf.getZkAvailableBookiesPath();
+        boolean availableNodeExists = null != zk.exists(availableBookiesPath, 
false);
+        try (RegistrationClient regClient = new ZKRegistrationClient()) {
+            regClient.initialize(new ClientConfiguration(conf), null, 
NullStatsLogger.INSTANCE, Optional.empty());
+            if (availableNodeExists) {
+                Collection<BookieSocketAddress> rwBookies = FutureUtils
+                        .result(regClient.getWritableBookies(), 
EXCEPTION_FUNC).getValue();
+                if (rwBookies != null && !rwBookies.isEmpty()) {
+                    log.error("Bookies are still up and connected to this 
cluster, "
+                            + "stop all bookies before nuking the cluster");
+                    return false;
                 }
-            }
 
-            // Clear underreplicatedledger locks
-            try {
-                ZKUtil.deleteRecursive(zk, 
ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath())
-                    + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK);
-            } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("underreplicatedledger locks node not exists in 
zookeeper to delete");
+                String readOnlyBookieRegPath = availableBookiesPath + "/" + 
BookKeeperConstants.READONLY;
+                boolean readonlyNodeExists = null != 
zk.exists(readOnlyBookieRegPath, false);
+                if (readonlyNodeExists) {
+                    Collection<BookieSocketAddress> roBookies = FutureUtils
+                            .result(regClient.getReadOnlyBookies(), 
EXCEPTION_FUNC).getValue();
+                    if (roBookies != null && !roBookies.isEmpty()) {
+                        log.error("Readonly Bookies are still up and connected 
to this cluster, "
+                                + "stop all bookies before nuking the 
cluster");
+                        return false;
+                    }
                 }
             }
+        }
 
-            // Clear the cookies
-            try {
-                ZKUtil.deleteRecursive(zk, conf.getZkLedgersRootPath()
-                    + "/cookies");
-            } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("cookies node not exists in zookeeper to 
delete");
-                }
+        LedgerManagerFactory ledgerManagerFactory = 
LedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
+        return ledgerManagerFactory.validateAndNukeExistingCluster(conf, 
layoutManager);
+    }
+
+    @Override
+    public boolean format() throws Exception {
+        // Clear underreplicated ledgers
+        try {
+            ZKUtil.deleteRecursive(zk, 
ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath())
+                    + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("underreplicated ledgers root path node not exists 
in zookeeper to delete");
             }
+        }
 
-            // Clear the INSTANCEID
-            try {
-                zk.delete(conf.getZkLedgersRootPath() + "/"
-                    + BookKeeperConstants.INSTANCEID, -1);
-            } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("INSTANCEID not exists in zookeeper to delete");
-                }
+        // Clear underreplicatedledger locks
+        try {
+            ZKUtil.deleteRecursive(zk, 
ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath()) + '/'
+                    + BookKeeperConstants.UNDER_REPLICATION_LOCK);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("underreplicatedledger locks node not exists in 
zookeeper to delete");
             }
+        }
 
-            // create INSTANCEID
-            String instanceId = UUID.randomUUID().toString();
-            zk.create(conf.getZkLedgersRootPath() + "/"
-                    + BookKeeperConstants.INSTANCEID, 
instanceId.getBytes(Charsets.UTF_8),
-                ZkUtils.getACLs(conf), CreateMode.PERSISTENT);
+        // Clear the cookies
+        try {
+            ZKUtil.deleteRecursive(zk, conf.getZkLedgersRootPath() + 
"/cookies");
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("cookies node not exists in zookeeper to delete");
+            }
+        }
 
-            log.info("Successfully formatted BookKeeper metadata");
-            return true;
+        // Clear the INSTANCEID
+        try {
+            zk.delete(conf.getZkLedgersRootPath() + "/" + 
BookKeeperConstants.INSTANCEID, -1);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("INSTANCEID not exists in zookeeper to delete");
+            }
         }
+
+        // create INSTANCEID
+        String instanceId = UUID.randomUUID().toString();
+        zk.create(conf.getZkLedgersRootPath() + "/" + 
BookKeeperConstants.INSTANCEID,
+                instanceId.getBytes(Charsets.UTF_8), ZkUtils.getACLs(conf), 
CreateMode.PERSISTENT);
+
+        log.info("Successfully formatted BookKeeper metadata");
+        return true;
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
index 9d25ac3..510f04c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -213,5 +213,4 @@ public abstract class AbstractHierarchicalLedgerManager 
extends AbstractZkLedger
         }
         return zkActiveLedgers;
     }
-
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 09c7a39..56fd035 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Abstract ledger manager based on zookeeper, which provides common methods 
such as query zk nodes.
  */
-abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
+public abstract class AbstractZkLedgerManager implements LedgerManager, 
Watcher {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractZkLedgerManager.class);
 
@@ -79,6 +79,9 @@ abstract class AbstractZkLedgerManager implements 
LedgerManager, Watcher {
     // we use this to prevent long stack chains from building up in callbacks
     protected ScheduledExecutorService scheduler;
 
+    /**
+     * ReadLedgerMetadataTask class.
+     */
     protected class ReadLedgerMetadataTask implements Runnable, 
GenericCallback<LedgerMetadata> {
 
         final long ledgerId;
@@ -496,18 +499,42 @@ abstract class AbstractZkLedgerManager implements 
LedgerManager, Watcher {
      *          Znode Name
      * @return true  if the znode is a special znode otherwise false
      */
-     protected static boolean isSpecialZnode(String znode) {
+     public static boolean isSpecialZnode(String znode) {
         if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
                 || BookKeeperConstants.COOKIE_NODE.equals(znode)
                 || BookKeeperConstants.LAYOUT_ZNODE.equals(znode)
                 || BookKeeperConstants.INSTANCEID.equals(znode)
-                || BookKeeperConstants.UNDER_REPLICATION_NODE.equals(znode)) {
+                || BookKeeperConstants.UNDER_REPLICATION_NODE.equals(znode)
+                || LegacyHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode)
+                || LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode)
+                || znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX)) 
{
             return true;
         }
         return false;
     }
 
     /**
+     * regex expression for name of top level parent znode for ledgers (in
+     * HierarchicalLedgerManager) or znode of a ledger (in FlatLedgerManager).
+     *
+     * @return
+     */
+    protected abstract String getLedgerParentNodeRegex();
+
+    /**
+     * whether the child of ledgersRootPath is a top level parent znode for
+     * ledgers (in HierarchicalLedgerManager) or znode of a ledger (in
+     * FlatLedgerManager).
+     *
+     * @param znode
+     *            Znode Name
+     * @return
+     */
+    public boolean isLedgerParentNode(String znode) {
+        return znode.matches(getLedgerParentNodeRegex());
+    }
+
+    /**
      * Convert the ZK retrieved ledger nodes to a HashSet for easier 
comparisons.
      *
      * @param ledgerNodes
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
new file mode 100644
index 0000000..b8dcc03
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Abstract ledger manager factory based on zookeeper, which provides common
+ * methods such as format and validateAndNukeExistingCluster.
+ */
+@Slf4j
+public abstract class AbstractZkLedgerManagerFactory extends 
LedgerManagerFactory {
+    protected ZooKeeper zk;
+
+    @Override
+    public void format(AbstractConfiguration conf, LayoutManager layoutManager)
+            throws InterruptedException, KeeperException, IOException {
+        try (AbstractZkLedgerManager ledgerManager = (AbstractZkLedgerManager) 
newLedgerManager()) {
+            String ledgersRootPath = conf.getZkLedgersRootPath();
+            List<String> children = zk.getChildren(ledgersRootPath, false);
+            for (String child : children) {
+                if (!AbstractZkLedgerManager.isSpecialZnode(child) && 
ledgerManager.isLedgerParentNode(child)) {
+                    ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
+                }
+            }
+        }
+        // Delete and recreate the LAYOUT information.
+        super.format(conf, layoutManager);
+    }
+
+    @Override
+    public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> 
conf, LayoutManager layoutManager)
+            throws InterruptedException, KeeperException, IOException {
+        String zkLedgersRootPath = conf.getZkLedgersRootPath();
+        String zkServers = conf.getZkServers();
+        AbstractZkLedgerManager zkLedgerManager = (AbstractZkLedgerManager) 
newLedgerManager();
+
+        /*
+         * before proceeding with nuking existing cluster, make sure there
+         * are no unexpected znodes under ledgersRootPath
+         */
+        List<String> ledgersRootPathChildrenList = 
zk.getChildren(zkLedgersRootPath, false);
+        for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
+            if 
((!AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren))
+                    && 
(!zkLedgerManager.isLedgerParentNode(ledgersRootPathChildren))) {
+                log.error("Found unexpected znode : {} under ledgersRootPath : 
{} so exiting nuke operation",
+                        ledgersRootPathChildren, zkLedgersRootPath);
+                return false;
+            }
+        }
+
+        // formatting ledgermanager deletes ledger znodes
+        format(conf, layoutManager);
+
+        // now delete all the special nodes recursively
+        ledgersRootPathChildrenList = zk.getChildren(zkLedgersRootPath, false);
+        for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
+            if 
(AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren)) {
+                ZKUtil.deleteRecursive(zk, zkLedgersRootPath + "/" + 
ledgersRootPathChildren);
+            } else {
+                log.error("Found unexpected znode : {} under ledgersRootPath : 
{} so exiting nuke operation",
+                        ledgersRootPathChildren, zkLedgersRootPath);
+                return false;
+            }
+        }
+
+        // finally deleting the ledgers rootpath
+        zk.delete(zkLedgersRootPath, -1);
+
+        log.info("Successfully nuked existing cluster, ZKServers: {} ledger 
root path: {}",
+                zkServers, zkLedgersRootPath);
+        return true;
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 26f55dc..7ee2e22 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -88,11 +88,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
         asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, 
ctx, successRc, failureRc);
     }
 
-    protected static boolean isSpecialZnode(String znode) {
-        return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX)
-            || AbstractZkLedgerManager.isSpecialZnode(znode);
-    }
-
     @Override
     public LedgerRangeIterator getLedgerRanges() {
         return new LedgerRangeIterator() {
@@ -134,4 +129,9 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
             }
         };
     }
+
+    @Override
+    protected String getLedgerParentNodeRegex() {
+        return StringUtils.FLAT_LEDGER_NODE_REGEX;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index ad1c71d..5e2ced5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -21,24 +21,22 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
 import java.util.List;
+
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 /**
  * Flat Ledger Manager Factory.
  */
-public class FlatLedgerManagerFactory extends LedgerManagerFactory {
+public class FlatLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
 
     public static final String NAME = "flat";
     public static final int CUR_VERSION = 1;
 
     AbstractConfiguration conf;
-    ZooKeeper zk;
 
     @Override
     public int getCurrentVersion() {
@@ -84,19 +82,4 @@ public class FlatLedgerManagerFactory extends 
LedgerManagerFactory {
             throws KeeperException, InterruptedException, 
ReplicationException.CompatibilityException {
         return new ZkLedgerUnderreplicationManager(conf, zk);
     }
-
-    @Override
-    public void format(AbstractConfiguration conf, LayoutManager layoutManager)
-            throws InterruptedException, KeeperException, IOException {
-        String ledgersRootPath = conf.getZkLedgersRootPath();
-        List<String> children = zk.getChildren(ledgersRootPath, false);
-        for (String child : children) {
-            if (FlatLedgerManager.isSpecialZnode(child)) {
-                continue;
-            }
-            ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
-        }
-        // Delete and recreate the LAYOUT information.
-        super.format(conf, layoutManager);
-    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 07607e4..946ed2a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -118,4 +118,9 @@ class HierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
         }
 
     }
+
+    @Override
+    protected String getLedgerParentNodeRegex() {
+        return StringUtils.HIERARCHICAL_LEDGER_PARENT_NODE_REGEX;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index eeeb6e9..4f2a2bf 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -38,4 +38,8 @@ public class HierarchicalLedgerManagerFactory extends 
LegacyHierarchicalLedgerMa
                 subIdGenerator, zkAcls);
     }
 
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new HierarchicalLedgerManager(conf, zk);
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 99df4cb..62ff092 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -263,4 +263,20 @@ public abstract class LedgerManagerFactory implements 
AutoCloseable {
         // Create new layout information again.
         createNewLMFactory(conf, lm, factoryClass);
     }
+
+    /**
+     * This method makes sure there are no unexpected znodes under 
ledgersRootPath
+     * and then it proceeds with ledger metadata formatting and nuking the 
cluster
+     * ZK state info.
+     *
+     * @param conf
+     *          Configuration instance
+     * @param lm
+     *          Layout manager
+     * @throws IOException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public abstract boolean 
validateAndNukeExistingCluster(AbstractConfiguration<?> conf, LayoutManager lm)
+            throws InterruptedException, KeeperException, IOException;
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
index ea85a8d..59c3f2c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -125,7 +125,7 @@ class LegacyHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager
         asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
             @Override
             public void process(final String l1Node, final 
AsyncCallback.VoidCallback cb1) {
-                if (isSpecialZnode(l1Node)) {
+                if (!isLedgerParentNode(l1Node)) {
                     cb1.processResult(successRc, null, context);
                     return;
                 }
@@ -147,20 +147,20 @@ class LegacyHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager
         }, finalCb, context, successRc, failureRc);
     }
 
-    protected static boolean isSpecialZnode(String znode) {
-        return IDGEN_ZNODE.equals(znode) || 
LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode)
-            || AbstractHierarchicalLedgerManager.isSpecialZnode(znode);
+    @Override
+    protected String getLedgerParentNodeRegex() {
+        return StringUtils.LEGACYHIERARCHICAL_LEDGER_PARENT_NODE_REGEX;
     }
 
     @Override
     public LedgerRangeIterator getLedgerRanges() {
-        return new HierarchicalLedgerRangeIterator();
+        return new LegacyHierarchicalLedgerRangeIterator();
     }
 
     /**
      * Iterator through each metadata bucket with hierarchical mode.
      */
-    private class HierarchicalLedgerRangeIterator implements 
LedgerRangeIterator {
+    private class LegacyHierarchicalLedgerRangeIterator implements 
LedgerRangeIterator {
         private Iterator<String> l1NodesIter = null;
         private Iterator<String> l2NodesIter = null;
         private String curL1Nodes = "";
@@ -182,7 +182,7 @@ class LegacyHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager
                     return false;
                 }
                 // Top level nodes are always exactly 2 digits long. (Don't 
pick up long hierarchical top level nodes)
-                if (isSpecialZnode(curL1Nodes) || curL1Nodes.length() > 2) {
+                if (!isLedgerParentNode(curL1Nodes)) {
                     continue;
                 }
                 List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + 
curL1Nodes, null);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
index 70c1abd..cd2bce1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -26,20 +26,17 @@ import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 /**
  * Hierarchical Ledger Manager Factory.
  */
-public class LegacyHierarchicalLedgerManagerFactory extends 
LedgerManagerFactory {
+public class LegacyHierarchicalLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
 
     public static final String NAME = "legacyhierarchical";
     public static final int CUR_VERSION = 1;
 
     AbstractConfiguration conf;
-    ZooKeeper zk;
 
     @Override
     public int getCurrentVersion() {
@@ -79,7 +76,7 @@ public class LegacyHierarchicalLedgerManagerFactory extends 
LedgerManagerFactory
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new HierarchicalLedgerManager(conf, zk);
+        return new LegacyHierarchicalLedgerManager(conf, zk);
     }
 
     @Override
@@ -87,20 +84,4 @@ public class LegacyHierarchicalLedgerManagerFactory extends 
LedgerManagerFactory
             throws KeeperException, InterruptedException, 
ReplicationException.CompatibilityException{
         return new ZkLedgerUnderreplicationManager(conf, zk);
     }
-
-    @Override
-    public void format(AbstractConfiguration conf, LayoutManager layoutManager)
-            throws InterruptedException, KeeperException, IOException {
-        String ledgersRootPath = conf.getZkLedgersRootPath();
-        List<String> children = zk.getChildren(ledgersRootPath, false);
-        for (String child : children) {
-            if (HierarchicalLedgerManager.isSpecialZnode(child)) {
-                continue;
-            }
-            ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
-        }
-        // Delete and recreate the LAYOUT information.
-        super.format(conf, layoutManager);
-    }
-
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index e5bdb1c..2e69e90 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -102,13 +102,6 @@ class LongHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
                 successRc, failureRc);
     }
 
-    protected static boolean isSpecialZnode(String znode) {
-        // Check nextnode length. All paths in long hierarchical format 
(3-4-4-4-4)
-        // are at least 3 characters long. This prevents picking up any 
old-style
-        // hierarchical paths (2-4-4)
-        return LegacyHierarchicalLedgerManager.isSpecialZnode(znode) || 
znode.length() < 3;
-    }
-
     private class RecursiveProcessor implements Processor<String> {
         private final int level;
         private final String path;
@@ -130,7 +123,7 @@ class LongHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
         @Override
         public void process(String lNode, VoidCallback cb) {
             String nodePath = path + "/" + lNode;
-            if ((level == 0) && isSpecialZnode(lNode)) {
+            if ((level == 0) && !isLedgerParentNode(lNode)) {
                 cb.processResult(successRc, null, context);
                 return;
             } else if (level < 3) {
@@ -262,7 +255,7 @@ class LongHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
             void advance() throws IOException {
                 while (thisLevelIterator.hasNext()) {
                     String node = thisLevelIterator.next();
-                    if (level == 0 && isSpecialZnode(node)) {
+                    if (level == 0 && !isLedgerParentNode(node)) {
                         continue;
                     }
                     LedgerRangeIterator nextIterator = level < 3
@@ -311,4 +304,9 @@ class LongHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
             return rootIterator.next();
         }
     }
+
+    @Override
+    protected String getLedgerParentNodeRegex() {
+        return StringUtils.LONGHIERARCHICAL_LEDGER_PARENT_NODE_REGEX;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 0e69385..cb8e8c3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -57,21 +58,23 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * MetaStore Based Ledger Manager Factory.
  */
+@Slf4j
 public class MSLedgerManagerFactory extends LedgerManagerFactory {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MSLedgerManagerFactory.class);
@@ -616,6 +619,25 @@ public class MSLedgerManagerFactory extends 
LedgerManagerFactory {
         public LedgerRangeIterator getLedgerRanges() {
             return new MSLedgerRangeIterator();
         }
+
+        /**
+         * Whether the znode a special znode.
+         *
+         * @param znode
+         *          Znode Name
+         * @return true  if the znode is a special znode otherwise false
+         */
+         public static boolean isSpecialZnode(String znode) {
+            if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
+                    || BookKeeperConstants.COOKIE_NODE.equals(znode)
+                    || BookKeeperConstants.LAYOUT_ZNODE.equals(znode)
+                    || BookKeeperConstants.INSTANCEID.equals(znode)
+                    || BookKeeperConstants.UNDER_REPLICATION_NODE.equals(znode)
+                    || MsLedgerManager.IDGEN_ZNODE.equals(znode)) {
+                return true;
+            }
+            return false;
+        }
     }
 
     @Override
@@ -720,4 +742,45 @@ public class MSLedgerManagerFactory extends 
LedgerManagerFactory {
         super.format(conf, layoutManager);
     }
 
+    @Override
+    public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> 
conf, LayoutManager layoutManager)
+            throws InterruptedException, KeeperException, IOException {
+        String zkLedgersRootPath = conf.getZkLedgersRootPath();
+        String zkServers = conf.getZkServers();
+
+        /*
+         * before proceeding with nuking existing cluster, make sure there
+         * are no unexpected znodes under ledgersRootPath
+         */
+        List<String> ledgersRootPathChildrenList = 
zk.getChildren(zkLedgersRootPath, false);
+        for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
+            if ((!MsLedgerManager.isSpecialZnode(ledgersRootPathChildren))) {
+                log.error("Found unexpected znode : {} under ledgersRootPath : 
{} so exiting nuke operation",
+                        ledgersRootPathChildren, zkLedgersRootPath);
+                return false;
+            }
+        }
+
+        // formatting ledgermanager deletes ledger znodes
+        format(conf, layoutManager);
+
+        // now delete all the special nodes recursively
+        ledgersRootPathChildrenList = zk.getChildren(zkLedgersRootPath, false);
+        for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
+            if (MsLedgerManager.isSpecialZnode(ledgersRootPathChildren)) {
+                ZKUtil.deleteRecursive(zk, zkLedgersRootPath + "/" + 
ledgersRootPathChildren);
+            } else {
+                log.error("Found unexpected znode : {} under ledgersRootPath : 
{} so exiting nuke operation",
+                        ledgersRootPathChildren, zkLedgersRootPath);
+                return false;
+            }
+        }
+
+        // finally deleting the ledgers rootpath
+        zk.delete(zkLedgersRootPath, -1);
+
+        log.info("Successfully nuked existing cluster, ZKServers: {} ledger 
root path: {}",
+                zkServers, zkLedgersRootPath);
+        return true;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index 44f5575..f55edd1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -29,6 +29,14 @@ public class StringUtils {
 
     // Ledger Node Prefix
     public static final String LEDGER_NODE_PREFIX = "L";
+    // Ledger znode in flatledgermanager layout will be "L" (prefix) +"%010d" 
(id in 10 digits)
+    public static final String FLAT_LEDGER_NODE_REGEX = 
StringUtils.LEDGER_NODE_PREFIX + "\\d{10}";
+    // top level znode in legacyhierarchicalledgermanger will be just 2 digits
+    public static final String LEGACYHIERARCHICAL_LEDGER_PARENT_NODE_REGEX = 
"\\d{2}";
+    // top level znode in longhierarchicalledgermanger will be just 3 digits
+    public static final String LONGHIERARCHICAL_LEDGER_PARENT_NODE_REGEX = 
"\\d{3}";
+    // top level znode in hierarchicalledgermanger will be just 2 digits
+    public static final String HIERARCHICAL_LEDGER_PARENT_NODE_REGEX = 
"\\d{2,3}";
 
     /**
      * Formats ledger ID according to ZooKeeper rules.
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 3f3e840..e1cb252 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -20,16 +20,22 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Charsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.net.InetAddresses;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.BKException.BKIllegalOpException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
@@ -37,6 +43,8 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.annotations.FlakyTest;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -235,7 +243,7 @@ public class BookKeeperAdminTest extends 
BookKeeperClusterTestCase {
         bkAdmin.close();
     }
 
-    @Test(timeout = 6000000)
+    @Test
     public void testBookieInit() throws Exception {
         int bookieindex = 0;
         ServerConfiguration confOfExistingBookie = bsConfs.get(bookieindex);
@@ -275,4 +283,200 @@ public class BookKeeperAdminTest extends 
BookKeeperClusterTestCase {
         Assert.assertTrue("initBookie shouldn't succeeded",
                 BookKeeperAdmin.initBookie(confOfExistingBookie));
     }
+
+    @Test
+    public void testInitNewCluster() throws Exception {
+        ServerConfiguration newConfig = new ServerConfiguration(baseConf);
+        String ledgersRootPath = "/testledgers";
+        newConfig.setZkLedgersRootPath(ledgersRootPath);
+        newConfig.setZkServers(zkUtil.getZooKeeperConnectString());
+        Assert.assertTrue("New cluster should be initialized successfully", 
BookKeeperAdmin.initNewCluster(newConfig));
+
+        Assert.assertTrue("Cluster rootpath should have been created 
successfully " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) != null));
+        String availableBookiesPath = newConfig.getZkAvailableBookiesPath();
+        Assert.assertTrue("AvailableBookiesPath should have been created 
successfully " + availableBookiesPath,
+                (zkc.exists(availableBookiesPath, false) != null));
+        String instanceIdPath = newConfig.getZkLedgersRootPath() + "/" + 
BookKeeperConstants.INSTANCEID;
+        Assert.assertTrue("InstanceId node should have been created 
successfully" + instanceIdPath,
+                (zkc.exists(instanceIdPath, false) != null));
+
+        String ledgersLayout = ledgersRootPath + "/" + 
BookKeeperConstants.LAYOUT_ZNODE;
+        Assert.assertTrue("Layout node should have been created successfully" 
+ ledgersLayout,
+                (zkc.exists(ledgersLayout, false) != null));
+
+        /**
+         * create znodes simulating existence of Bookies in the cluster
+         */
+        int numOfBookies = 3;
+        Random rand = new Random();
+        for (int i = 0; i < numOfBookies; i++) {
+            String ipString = 
InetAddresses.fromInteger(rand.nextInt()).getHostAddress();
+            String regPath = newConfig.getZkAvailableBookiesPath() + "/" + 
ipString + ":3181";
+            zkc.create(regPath, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
+        }
+
+        /*
+         * now it should be possible to create ledger and delete the same
+         */
+        BookKeeper bk = new BookKeeper(new ClientConfiguration(newConfig));
+        LedgerHandle lh = bk.createLedger(numOfBookies, numOfBookies, 
numOfBookies, BookKeeper.DigestType.MAC,
+                new byte[0]);
+        bk.deleteLedger(lh.ledgerId);
+        bk.close();
+    }
+
+    @Test
+    public void testNukeExistingClusterWithForceOption() throws Exception {
+        String ledgersRootPath = "/testledgers";
+        ServerConfiguration newConfig = new ServerConfiguration(baseConf);
+        newConfig.setZkLedgersRootPath(ledgersRootPath);
+        List<String> bookiesRegPaths = new ArrayList<String>();
+        initiateNewClusterAndCreateLedgers(newConfig, bookiesRegPaths);
+
+        /*
+         * before nuking existing cluster, bookies shouldn't be registered
+         * anymore
+         */
+        for (int i = 0; i < bookiesRegPaths.size(); i++) {
+            zkc.delete(bookiesRegPaths.get(i), -1);
+        }
+
+        Assert.assertTrue("New cluster should be nuked successfully",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, 
ledgersRootPath, null, true));
+        Assert.assertTrue("Cluster rootpath should have been deleted 
successfully " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) == null));
+    }
+
+    @Test
+    public void testNukeExistingClusterWithInstanceId() throws Exception {
+        String ledgersRootPath = "/testledgers";
+        ServerConfiguration newConfig = new ServerConfiguration(baseConf);
+        newConfig.setZkLedgersRootPath(ledgersRootPath);
+        List<String> bookiesRegPaths = new ArrayList<String>();
+        initiateNewClusterAndCreateLedgers(newConfig, bookiesRegPaths);
+
+        /*
+         * before nuking existing cluster, bookies shouldn't be registered
+         * anymore
+         */
+        for (int i = 0; i < bookiesRegPaths.size(); i++) {
+            zkc.delete(bookiesRegPaths.get(i), -1);
+        }
+
+        byte[] data = zkc.getData(newConfig.getZkLedgersRootPath() + "/" + 
BookKeeperConstants.INSTANCEID, false, null);
+        String readInstanceId = new String(data, UTF_8);
+
+        Assert.assertTrue("New cluster should be nuked successfully",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, 
ledgersRootPath, readInstanceId, false));
+        Assert.assertTrue("Cluster rootpath should have been deleted 
successfully " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) == null));
+    }
+
+    @Test
+    public void tryNukingExistingClustersWithInvalidParams() throws Exception {
+        String ledgersRootPath = "/testledgers";
+        ServerConfiguration newConfig = new ServerConfiguration(baseConf);
+        newConfig.setZkLedgersRootPath(ledgersRootPath);
+        List<String> bookiesRegPaths = new ArrayList<String>();
+        initiateNewClusterAndCreateLedgers(newConfig, bookiesRegPaths);
+
+        /*
+         * create ledger with a specific ledgerid
+         */
+        BookKeeper bk = new BookKeeper(new ClientConfiguration(newConfig));
+        long ledgerId = 23456789L;
+        LedgerHandle lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, 
BookKeeper.DigestType.MAC, new byte[0], null);
+        lh.close();
+
+        /*
+         * read instanceId
+         */
+        byte[] data = zkc.getData(newConfig.getZkLedgersRootPath() + "/" + 
BookKeeperConstants.INSTANCEID, false, null);
+        String readInstanceId = new String(data, UTF_8);
+
+        /*
+         * register a RO bookie
+         */
+        String ipString = InetAddresses.fromInteger((new 
Random()).nextInt()).getHostAddress();
+        String roBookieRegPath = newConfig.getZkAvailableBookiesPath() + "/" + 
BookKeeperConstants.READONLY + "/"
+                + ipString + ":3181";
+        zkc.create(roBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
+
+        Assert.assertFalse("Cluster should'nt be nuked since instanceid is not 
provided and force option is not set",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, 
ledgersRootPath, null, false));
+        Assert.assertFalse("Cluster should'nt be nuked since incorrect 
instanceid is provided",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, 
ledgersRootPath, "incorrectinstanceid", false));
+        Assert.assertFalse("Cluster should'nt be nuked since bookies are still 
registered",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, 
ledgersRootPath, readInstanceId, false));
+        /*
+         * delete all rw bookies registration
+         */
+        for (int i = 0; i < bookiesRegPaths.size(); i++) {
+            zkc.delete(bookiesRegPaths.get(i), -1);
+        }
+        Assert.assertFalse("Cluster should'nt be nuked since ro bookie is 
still registered",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, 
ledgersRootPath, readInstanceId, false));
+
+        /*
+         * make sure no node is deleted
+         */
+        Assert.assertTrue("Cluster rootpath should be existing " + 
ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) != null));
+        String availableBookiesPath = newConfig.getZkAvailableBookiesPath();
+        Assert.assertTrue("AvailableBookiesPath should be existing " + 
availableBookiesPath,
+                (zkc.exists(availableBookiesPath, false) != null));
+        String instanceIdPath = newConfig.getZkLedgersRootPath() + "/" + 
BookKeeperConstants.INSTANCEID;
+        Assert.assertTrue("InstanceId node should be existing" + 
instanceIdPath,
+                (zkc.exists(instanceIdPath, false) != null));
+        String ledgersLayout = ledgersRootPath + "/" + 
BookKeeperConstants.LAYOUT_ZNODE;
+        Assert.assertTrue("Layout node should be existing" + ledgersLayout, 
(zkc.exists(ledgersLayout, false) != null));
+
+        /*
+         * ledger should not be deleted.
+         */
+        lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.MAC, new 
byte[0]);
+        lh.close();
+        bk.close();
+
+        /*
+         * delete ro bookie reg znode
+         */
+        zkc.delete(roBookieRegPath, -1);
+
+        Assert.assertTrue("Cluster should be nuked since no bookie is 
registered",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, 
ledgersRootPath, readInstanceId, false));
+        Assert.assertTrue("Cluster rootpath should have been deleted 
successfully " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) == null));
+    }
+
+    void initiateNewClusterAndCreateLedgers(ServerConfiguration newConfig, 
List<String> bookiesRegPaths)
+            throws Exception {
+        newConfig.setZkServers(zkUtil.getZooKeeperConnectString());
+        Assert.assertTrue("New cluster should be initialized successfully", 
BookKeeperAdmin.initNewCluster(newConfig));
+
+        /**
+         * create znodes simulating existence of Bookies in the cluster
+         */
+        int numberOfBookies = 3;
+        Random rand = new Random();
+        for (int i = 0; i < numberOfBookies; i++) {
+            String ipString = 
InetAddresses.fromInteger(rand.nextInt()).getHostAddress();
+            bookiesRegPaths.add(newConfig.getZkAvailableBookiesPath() + "/" + 
ipString + ":3181");
+            zkc.create(bookiesRegPaths.get(i), new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+
+        /*
+         * now it should be possible to create ledger and delete the same
+         */
+        BookKeeper bk = new BookKeeper(new ClientConfiguration(newConfig));
+        LedgerHandle lh;
+        int numOfLedgers = 5;
+        for (int i = 0; i < numOfLedgers; i++) {
+            lh = bk.createLedger(numberOfBookies, numberOfBookies, 
numberOfBookies, BookKeeper.DigestType.MAC,
+                    new byte[0]);
+            lh.close();
+        }
+        bk.close();
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index 239bc0e..b3aa425 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -30,6 +30,8 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
@@ -50,9 +52,10 @@ import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
-
 /**
  * Test the ledger manager iterator.
  */
@@ -435,4 +438,83 @@ public class LedgerManagerIteratorTest extends 
LedgerManagerTestCase {
             thread.join();
         }
     }
+
+    @Test
+    public void testLedgerParentNode() throws Throwable {
+        /*
+         * this testcase applies only ZK based ledgermanager so it doesnt work
+         * for MSLedgerManager
+         */
+        
Assume.assumeTrue(!baseConf.getLedgerManagerFactoryClass().equals(MSLedgerManagerFactory.class));
+        AbstractZkLedgerManager lm = (AbstractZkLedgerManager) 
getLedgerManager();
+        List<Long> ledgerIds;
+        if 
(baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class)
+                || 
baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
+            ledgerIds = Arrays.asList(100L, (Integer.MAX_VALUE * 10L));
+        } else {
+            ledgerIds = Arrays.asList(100L, (Integer.MAX_VALUE - 10L));
+        }
+        for (long ledgerId : ledgerIds) {
+            String fullLedgerPath = lm.getLedgerPath(ledgerId);
+            String ledgerPath = 
fullLedgerPath.replaceAll(baseConf.getZkLedgersRootPath() + "/", "");
+            String[] znodesOfLedger = ledgerPath.split("/");
+            Assert.assertTrue(znodesOfLedger[0] + " is supposed to be valid 
parent ",
+                    lm.isLedgerParentNode(znodesOfLedger[0]));
+        }
+    }
+
+    @Test
+    public void testLedgerManagerFormat() throws Throwable {
+        /*
+         * this testcase applies only ZK based ledgermanager so it doesnt work
+         * for MSLedgerManager
+         */
+        
Assume.assumeTrue(!baseConf.getLedgerManagerFactoryClass().equals(MSLedgerManagerFactory.class));
+        AbstractZkLedgerManager lm = (AbstractZkLedgerManager) 
getLedgerManager();
+        Collection<Long> ids = Arrays.asList(1234567890L, 2L, 32345L, 
23456789L);
+        if 
(baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class)
+                || 
baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
+            ids = new ArrayList<Long>(ids);
+            ids.add(Integer.MAX_VALUE * 2L);
+            ids.add(1234567891234L);
+        }
+        for (Long id : ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        // create some invalid nodes under zkLedgersRootPath
+        Collection<String> invalidZnodes = Arrays.asList("12345", 
"12345678901L", "abc", "123d");
+        for (String invalidZnode : invalidZnodes) {
+            ZkUtils.createFullPathOptimistic(zkc, 
baseConf.getZkLedgersRootPath() + "/" + invalidZnode,
+                    "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        }
+
+        /*
+         * get the count of total children under zkLedgersRootPath and also
+         * count of the parent nodes of ledgers under zkLedgersRootPath
+         */
+        List<String> childrenOfLedgersRootPath = 
zkc.getChildren(baseConf.getZkLedgersRootPath(), false);
+        int totalChildrenOfLedgersRootPath = childrenOfLedgersRootPath.size();
+        int totalParentNodesOfLedgers = 0;
+        for (String childOfLedgersRootPath : childrenOfLedgersRootPath) {
+            if (lm.isLedgerParentNode(childOfLedgersRootPath)) {
+                totalParentNodesOfLedgers++;
+            }
+        }
+
+        /*
+         * after ledgermanagerfactory format only the znodes of created ledgers
+         * under zkLedgersRootPath should be deleted recursively but not
+         * specialnode or invalid nodes created above
+         */
+        ledgerManagerFactory.format(baseConf,
+                new ZkLayoutManager(zkc, baseConf.getZkLedgersRootPath(), 
ZkUtils.getACLs(baseConf)));
+        List<String> childrenOfLedgersRootPathAfterFormat = 
zkc.getChildren(baseConf.getZkLedgersRootPath(), false);
+        int totalChildrenOfLedgersRootPathAfterFormat = 
childrenOfLedgersRootPathAfterFormat.size();
+        Assert.assertEquals("totalChildrenOfLedgersRootPathAfterFormat",
+                totalChildrenOfLedgersRootPath - totalParentNodesOfLedgers, 
totalChildrenOfLedgersRootPathAfterFormat);
+
+        Assert.assertTrue("ChildrenOfLedgersRootPathAfterFormat should contain 
all the invalid znodes created",
+                
childrenOfLedgersRootPathAfterFormat.containsAll(invalidZnodes));
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to