This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5dc11a5d4ed [fix] [metadata] Fix zookeeper related flacky test (#21310)
5dc11a5d4ed is described below

commit 5dc11a5d4edbcbc3a13d4e20c9b510ed99b007c9
Author: Yan Zhao <[email protected]>
AuthorDate: Sun Oct 8 16:11:33 2023 +0800

    [fix] [metadata] Fix zookeeper related flacky test (#21310)
    
    Fix zookeeper related flacky test
    
    (cherry picked from commit 9ab7417edc20b6618bac8f66921815c7b9d5e5b8)
---
 pulsar-metadata/pom.xml                            |   7 +
 .../bookkeeper/replication/AuditorBookieTest.java  |   1 -
 .../AuditorCheckAllLedgersTaskTest.java            |   1 -
 .../replication/AuditorLedgerCheckerTest.java      |   1 -
 .../AuditorPeriodicBookieCheckTest.java            |   1 -
 .../replication/AuditorPeriodicCheckTest.java      |   1 -
 .../AuditorPlacementPolicyCheckTaskTest.java       |   1 -
 .../AuditorPlacementPolicyCheckTest.java           |   1 -
 .../replication/AuditorReplicasCheckTaskTest.java  |   1 -
 .../replication/AuditorReplicasCheckTest.java      |   1 -
 .../replication/AuditorRollingRestartTest.java     |   1 -
 .../replication/AuthAutoRecoveryTest.java          |   1 -
 .../replication/AutoRecoveryMainTest.java          |   6 -
 .../replication}/BookKeeperClusterTestCase.java    | 253 ++++++---------------
 .../replication/BookieAutoRecoveryTest.java        |   1 -
 .../replication/BookieLedgerIndexTest.java         |   1 -
 .../TestAutoRecoveryAlongWithBookieServers.java    |   1 -
 .../replication/TestReplicationWorker.java         |   1 -
 .../bookkeeper/replication/ZooKeeperUtil.java      | 215 +++++++++++++++++
 .../bookkeeper/test/BookKeeperClusterTestCase.java |   3 +-
 .../bookkeeper/client/BookKeeperTestClient.java    |   1 -
 21 files changed, 288 insertions(+), 212 deletions(-)

diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 421d461d3b0..3bf76b83367 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -86,6 +86,13 @@
     </dependency>
 
     <!-- zookeeper server -->
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>testmocks</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.xerial.snappy</groupId>
       <artifactId>snappy-java</artifactId>
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
index 9750fb52d41..14cdf3e1fc2 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
@@ -31,7 +31,6 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerAuditorManager;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
index 5c0a3f39325..6b58c72af07 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
@@ -30,7 +30,6 @@ import org.apache.bookkeeper.meta.LayoutManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index ffd71f9311f..ec5f77f7946 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -67,7 +67,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
index 4acb207570a..9e8c5a54a5d 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
@@ -30,7 +30,6 @@ import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index e3b0aa37d7e..0fa6538b3ed 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -66,7 +66,6 @@ import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
index 1bafb8589d9..8b9c0b14302 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
@@ -31,7 +31,6 @@ import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index 159a4e88a33..5637819a927 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -53,7 +53,6 @@ import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityExcep
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
index 21dd2807b75..62162bd25f4 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
@@ -31,7 +31,6 @@ import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
index a4d6d86dece..2e9dbc15859 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
@@ -57,7 +57,6 @@ import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
index 2c458d635f5..3e5081ed0ef 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
@@ -29,7 +29,6 @@ import org.apache.bookkeeper.meta.LedgerAuditorManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestCallbacks;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
index db338d1bb4b..41e159b7771 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
@@ -27,7 +27,6 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.ClientConnectionPeer;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 62416968142..1d741c551dd 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -25,10 +25,8 @@ import static org.testng.AssertJUnit.assertTrue;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.TestUtils;
 import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory;
 import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
@@ -42,7 +40,6 @@ import org.testng.annotations.Test;
 /**
  * Test the AuditorPeer.
  */
-@Slf4j
 public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
 
     public AutoRecoveryMainTest() throws Exception {
@@ -68,7 +65,6 @@ public class AutoRecoveryMainTest extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testStartup() throws Exception {
-        log.info("testStartup()");
         confByIndex(0).setMetadataServiceUri(
                 zkUtil.getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:").replaceAll("/ledgers", ""));
         AutoRecoveryMain main = new AutoRecoveryMain(confByIndex(0));
@@ -89,7 +85,6 @@ public class AutoRecoveryMainTest extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testShutdown() throws Exception {
-        log.info("testShutdown()");
         confByIndex(0).setMetadataServiceUri(
                 zkUtil.getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:").replaceAll("/ledgers", ""));
         AutoRecoveryMain main = new AutoRecoveryMain(confByIndex(0));
@@ -113,7 +108,6 @@ public class AutoRecoveryMainTest extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testAutoRecoverySessionLoss() throws Exception {
-        log.info("testAutoRecoverySessionLoss()");
         confByIndex(0).setMetadataServiceUri(
                 zkUtil.getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:").replaceAll("/ledgers", ""));
         confByIndex(1).setMetadataServiceUri(
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
similarity index 78%
copy from 
pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
copy to 
pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
index 40c2041d4e6..c681a1f0764 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
@@ -21,11 +21,8 @@
  * http://bookkeeper.apache.org
  */
 
-package 
org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+package org.apache.bookkeeper.replication;
 
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
-import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
-import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertFalse;
@@ -41,6 +38,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
@@ -48,58 +47,42 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.bookie.BookieImpl;
-import org.apache.bookkeeper.bookie.BookieResources;
-import org.apache.bookkeeper.bookie.LedgerDirsManager;
-import org.apache.bookkeeper.bookie.LedgerStorage;
-import org.apache.bookkeeper.bookie.LegacyCookieValidation;
-import org.apache.bookkeeper.bookie.MockUncleanShutdownDetection;
-import org.apache.bookkeeper.bookie.ReadOnlyBookie;
-import org.apache.bookkeeper.bookie.UncleanShutdownDetection;
-import org.apache.bookkeeper.bookie.UncleanShutdownDetectionImpl;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
 import org.apache.bookkeeper.client.TestStatsProvider;
-import org.apache.bookkeeper.common.allocator.ByteBufAllocatorWithOomHandler;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
-import org.apache.bookkeeper.discover.BookieServiceInfo;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
-import org.apache.bookkeeper.meta.MetadataBookieDriver;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.metastore.InMemoryMetaStore;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.replication.Auditor;
-import org.apache.bookkeeper.replication.AutoRecoveryMain;
-import org.apache.bookkeeper.replication.ReplicationWorker;
-import org.apache.bookkeeper.server.Main;
-import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.ServerTester;
 import org.apache.bookkeeper.test.TmpDirs;
 import org.apache.bookkeeper.test.ZooKeeperCluster;
 import org.apache.bookkeeper.test.ZooKeeperClusterUtil;
-import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.pulsar.common.util.PortManager;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
 
 /**
  * A class runs several bookie servers for testing.
  */
 public abstract class BookKeeperClusterTestCase {
 
-    static final Logger LOG = 
LoggerFactory.getLogger(org.apache.bookkeeper.test.BookKeeperClusterTestCase.class);
+    static final Logger LOG = 
LoggerFactory.getLogger(BookKeeperClusterTestCase.class);
 
-    String testName;
+    protected String testName;
 
     @BeforeMethod
     public void handleTestMethodName(Method method) {
@@ -110,10 +93,11 @@ public abstract class BookKeeperClusterTestCase {
     protected final ZooKeeperCluster zkUtil;
     protected ZooKeeper zkc;
     protected String metadataServiceUri;
+    protected FaultInjectionMetadataStore metadataStore;
 
     // BookKeeper related variables
     protected final TmpDirs tmpDirs = new TmpDirs();
-    private final List<ServerTester> servers = new LinkedList<>();
+    protected final List<ServerTester> servers = new LinkedList<>();
 
     protected int numBookies;
     protected BookKeeperTestClient bkc;
@@ -126,10 +110,9 @@ public abstract class BookKeeperClusterTestCase {
      */
     protected final ServerConfiguration baseConf = 
TestBKConfiguration.newServerConfiguration();
     protected final ClientConfiguration baseClientConf = 
TestBKConfiguration.newClientConfiguration();
-    private final ByteBufAllocatorWithOomHandler allocator = 
BookieResources.createAllocator(baseConf);
 
     private boolean isAutoRecoveryEnabled;
-
+    protected ExecutorService executor;
     private final List<Integer> bookiePorts = new ArrayList<>();
 
     SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
@@ -153,7 +136,7 @@ public abstract class BookKeeperClusterTestCase {
     public BookKeeperClusterTestCase(int numBookies, int numOfZKNodes, int 
testTimeoutSecs) {
         this.numBookies = numBookies;
         if (numOfZKNodes == 1) {
-            zkUtil = new ZooKeeperUtil();
+            zkUtil = new ZooKeeperUtil(getLedgersRootPath());
         } else {
             try {
                 zkUtil = new ZooKeeperClusterUtil(numOfZKNodes);
@@ -163,9 +146,9 @@ public abstract class BookKeeperClusterTestCase {
         }
     }
 
-    @BeforeMethod
+    @BeforeTest
     public void setUp() throws Exception {
-        setUp("/ledgers");
+        setUp(getLedgersRootPath());
     }
 
     protected void setUp(String ledgersRootPath) throws Exception {
@@ -173,6 +156,7 @@ public abstract class BookKeeperClusterTestCase {
         InMemoryMetaStore.reset();
         setMetastoreImplClass(baseConf);
         setMetastoreImplClass(baseClientConf);
+        executor = Executors.newCachedThreadPool();
 
         Stopwatch sw = Stopwatch.createStarted();
         try {
@@ -193,7 +177,15 @@ public abstract class BookKeeperClusterTestCase {
         return zkUtil.getMetadataServiceUri(ledgersRootPath);
     }
 
-    @AfterMethod(alwaysRun = true)
+    private String getLedgersRootPath() {
+        return changeLedgerPath() + "/ledgers";
+    }
+
+    protected String changeLedgerPath() {
+        return "";
+    }
+
+    @AfterTest(alwaysRun = true)
     public void tearDown() throws Exception {
         boolean failed = false;
         for (Throwable e : asyncExceptions) {
@@ -213,6 +205,8 @@ public abstract class BookKeeperClusterTestCase {
         }
         // stop zookeeper service
         try {
+            // cleanup for metrics.
+            metadataStore.close();
             stopZKCluster();
         } catch (Exception e) {
             LOG.error("Got Exception while trying to stop ZKCluster", e);
@@ -225,6 +219,9 @@ public abstract class BookKeeperClusterTestCase {
             LOG.error("Got Exception while trying to cleanupTempDirs", e);
             tearDownException = e;
         }
+
+        executor.shutdownNow();
+
         LOG.info("Tearing down test {} in {} ms.", testName, 
sw.elapsed(TimeUnit.MILLISECONDS));
         if (tearDownException != null) {
             throw tearDownException;
@@ -239,6 +236,9 @@ public abstract class BookKeeperClusterTestCase {
     protected void startZKCluster() throws Exception {
         zkUtil.startCluster();
         zkc = zkUtil.getZooKeeperClient();
+        metadataStore = new FaultInjectionMetadataStore(
+                
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
+                MetadataStoreConfig.builder().build()));
     }
 
     /**
@@ -260,13 +260,14 @@ public abstract class BookKeeperClusterTestCase {
         baseConf.setMetadataServiceUri(metadataServiceUri);
         baseClientConf.setMetadataServiceUri(metadataServiceUri);
         baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+
         if (numBookies > 0) {
             bkc = new BookKeeperTestClient(baseClientConf, new 
TestStatsProvider());
         }
 
         // Create Bookie Servers (B1, B2, B3)
         for (int i = 0; i < numBookies; i++) {
-            startNewBookie();
+            bookiePorts.add(startNewBookie());
         }
     }
 
@@ -486,7 +487,7 @@ public abstract class BookKeeperClusterTestCase {
     public ServerConfiguration killBookieAndWaitForZK(int index) throws 
Exception {
         ServerTester tester = servers.get(index); // IKTODO: this method is 
awful
         ServerConfiguration ret = killBookie(index);
-        while 
(zkc.exists(ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf) + "/" + 
AVAILABLE_NODE + "/"
+        while (zkc.exists("/ledgers/" + AVAILABLE_NODE + "/"
                 + tester.getServer().getBookieId().toString(), false) != null) 
{
             Thread.sleep(500);
         }
@@ -693,6 +694,20 @@ public abstract class BookKeeperClusterTestCase {
         if (isAutoRecoveryEnabled()) {
             tester.startAutoRecovery();
         }
+
+        int port = conf.getBookiePort();
+
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+            while (zkc.exists("/ledgers/" + AVAILABLE_NODE + "/"
+                    + tester.getServer().getBookieId().toString(), false) == 
null) {
+                Thread.sleep(100);
+            }
+            return true;
+        });
+        bkc.readBookiesBlocking();
+
+        LOG.info("New bookie on port " + port + " has been created.");
+
         return tester;
     }
 
@@ -714,15 +729,23 @@ public abstract class BookKeeperClusterTestCase {
         tester.getServer().start();
 
         waitForBookie.get(30, TimeUnit.SECONDS);
-        LOG.info("New bookie '{}' has been created.", address);
 
         if (isAutoRecoveryEnabled()) {
             tester.startAutoRecovery();
         }
+
+        int port = conf.getBookiePort();
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
+                metadataStore.exists(
+                getLedgersRootPath() + "/available/" + address).join()
+        );
+        bkc.readBookiesBlocking();
+
+        LOG.info("New bookie '{}' has been created.", address);
         return tester;
     }
 
-    public void setMetastoreImplClass(AbstractConfiguration<?> conf) {
+    public void setMetastoreImplClass(AbstractConfiguration conf) {
         conf.setMetastoreImplClass(InMemoryMetaStore.class.getName());
     }
 
@@ -789,7 +812,7 @@ public abstract class BookKeeperClusterTestCase {
                 // AutoRecovery daemon
                 if (a != null && a.isRunning()
                         && replicationWorker != null && 
replicationWorker.isRunning()) {
-                    LOG.info("Found Auditor Bookie {}", 
t.server.getBookieId());
+                    LOG.info("Found Auditor Bookie {}", 
t.getServer().getBookieId());
                     return a;
                 }
             }
@@ -825,150 +848,4 @@ public abstract class BookKeeperClusterTestCase {
         return servers.get(index).getStatsProvider();
     }
 
-    /**
-     * Class to encapsulate all the test objects.
-     */
-    public class ServerTester {
-        private final ServerConfiguration conf;
-        private final TestStatsProvider provider;
-        private final Bookie bookie;
-        private final BookieServer server;
-        private final BookieSocketAddress address;
-        private final MetadataBookieDriver metadataDriver;
-        private final RegistrationManager registrationManager;
-        private final LedgerManagerFactory lmFactory;
-        private final LedgerManager ledgerManager;
-        private final LedgerStorage storage;
-
-        private AutoRecoveryMain autoRecovery;
-
-        ServerTester(ServerConfiguration conf) throws Exception {
-            this.conf = conf;
-            provider = new TestStatsProvider();
-
-            StatsLogger rootStatsLogger = provider.getStatsLogger("");
-            StatsLogger bookieStats = rootStatsLogger.scope(BOOKIE_SCOPE);
-
-            metadataDriver = BookieResources.createMetadataDriver(conf, 
bookieStats);
-            registrationManager = metadataDriver.createRegistrationManager();
-            lmFactory = metadataDriver.getLedgerManagerFactory();
-            ledgerManager = lmFactory.newLedgerManager();
-
-            LegacyCookieValidation cookieValidation = new 
LegacyCookieValidation(
-                    conf, registrationManager);
-            
cookieValidation.checkCookies(Main.storageDirectoriesFromConf(conf));
-
-            DiskChecker diskChecker = BookieResources.createDiskChecker(conf);
-            LedgerDirsManager ledgerDirsManager = 
BookieResources.createLedgerDirsManager(
-                    conf, diskChecker, bookieStats.scope(LD_LEDGER_SCOPE));
-            LedgerDirsManager indexDirsManager = 
BookieResources.createIndexDirsManager(
-                    conf, diskChecker, bookieStats.scope(LD_INDEX_SCOPE), 
ledgerDirsManager);
-
-            UncleanShutdownDetection uncleanShutdownDetection = new 
UncleanShutdownDetectionImpl(ledgerDirsManager);
-
-            storage = BookieResources.createLedgerStorage(
-                    conf, ledgerManager, ledgerDirsManager, indexDirsManager,
-                    bookieStats, allocator);
-
-            if (conf.isForceReadOnlyBookie()) {
-                bookie = new ReadOnlyBookie(conf, registrationManager, storage,
-                        diskChecker, ledgerDirsManager, indexDirsManager,
-                        bookieStats, allocator, BookieServiceInfo.NO_INFO);
-            } else {
-                bookie = new BookieImpl(conf, registrationManager, storage,
-                        diskChecker, ledgerDirsManager, indexDirsManager,
-                        bookieStats, allocator, BookieServiceInfo.NO_INFO);
-            }
-            server = new BookieServer(conf, bookie, rootStatsLogger, allocator,
-                    uncleanShutdownDetection);
-            address = BookieImpl.getBookieAddress(conf);
-
-            autoRecovery = null;
-        }
-
-        ServerTester(ServerConfiguration conf, Bookie b) throws Exception {
-            this.conf = conf;
-            provider = new TestStatsProvider();
-
-            metadataDriver = null;
-            registrationManager = null;
-            ledgerManager = null;
-            lmFactory = null;
-            storage = null;
-
-            bookie = b;
-            server = new BookieServer(conf, b, provider.getStatsLogger(""),
-                    allocator, new MockUncleanShutdownDetection());
-            address = BookieImpl.getBookieAddress(conf);
-
-            autoRecovery = null;
-        }
-
-        void startAutoRecovery() throws Exception {
-            LOG.debug("Starting Auditor Recovery for the bookie: {}", address);
-            autoRecovery = new AutoRecoveryMain(conf);
-            autoRecovery.start();
-        }
-
-        void stopAutoRecovery() {
-            if (autoRecovery != null) {
-                LOG.debug("Shutdown Auditor Recovery for the bookie: {}", 
address);
-                autoRecovery.shutdown();
-            }
-        }
-
-        Auditor getAuditor() {
-            if (autoRecovery != null) {
-                return autoRecovery.getAuditor();
-            } else {
-                return null;
-            }
-        }
-
-        ReplicationWorker getReplicationWorker() {
-            if (autoRecovery != null) {
-                return autoRecovery.getReplicationWorker();
-            } else {
-                return null;
-            }
-        }
-
-        ServerConfiguration getConfiguration() {
-            return conf;
-        }
-
-        public BookieServer getServer() {
-            return server;
-        }
-
-        TestStatsProvider getStatsProvider() {
-            return provider;
-        }
-
-        BookieSocketAddress getAddress() {
-            return address;
-        }
-
-        void shutdown() throws Exception {
-            server.shutdown();
-
-            if (ledgerManager != null) {
-                ledgerManager.close();
-            }
-            if (lmFactory != null) {
-                lmFactory.close();
-            }
-            if (registrationManager != null) {
-                registrationManager.close();
-            }
-            if (metadataDriver != null) {
-                metadataDriver.close();
-            }
-
-            if (autoRecovery != null) {
-                LOG.debug("Shutdown auto recovery for bookieserver: {}", 
address);
-                autoRecovery.shutdown();
-            }
-        }
-    }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index c8c76302b89..888303d3e66 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -44,7 +44,6 @@ import 
org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
index eb9f95ffdf7..1d5cf868cce 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
@@ -35,7 +35,6 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.LayoutManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.bookkeeper.PulsarLayoutManager;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
index 8a2e7f2747a..11797c83737 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
@@ -27,7 +27,6 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index ca02f91d1de..7938feaba19 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -78,7 +78,6 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/ZooKeeperUtil.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/ZooKeeperUtil.java
new file mode 100644
index 00000000000..5113edb72c4
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/ZooKeeperUtil.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This file is derived from ZooKeeperUtil from Apache BookKeeper
+ * http://bookkeeper.apache.org
+ */
+
+package org.apache.bookkeeper.replication;
+
+import static org.testng.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.test.ZooKeeperCluster;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the zookeeper utilities.
+ */
+public class ZooKeeperUtil implements ZooKeeperCluster {
+
+    static {
+        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 
3.5.3 four letter words
+        // are disabled by default due to security reasons
+        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    }
+    static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtil.class);
+
+    // ZooKeeper related variables
+    protected Integer zooKeeperPort = 0;
+    private InetSocketAddress zkaddr;
+
+    protected ZooKeeperServer zks;
+    protected ZooKeeper zkc; // zookeeper client
+    protected NIOServerCnxnFactory serverFactory;
+    protected File zkTmpDir;
+    private String connectString;
+    private String ledgersRootPath;
+
+    public ZooKeeperUtil(String ledgersRootPath) {
+        this.ledgersRootPath = ledgersRootPath;
+        String loopbackIPAddr = 
InetAddress.getLoopbackAddress().getHostAddress();
+        zkaddr = new InetSocketAddress(loopbackIPAddr, 0);
+        connectString = loopbackIPAddr + ":" + zooKeeperPort;
+    }
+
+    @Override
+    public ZooKeeper getZooKeeperClient() {
+        return zkc;
+    }
+
+    @Override
+    public String getZooKeeperConnectString() {
+        return connectString;
+    }
+
+    @Override
+    public String getMetadataServiceUri() {
+        return getMetadataServiceUri("/ledgers");
+    }
+
+    @Override
+    public String getMetadataServiceUri(String zkLedgersRootPath) {
+        return "zk://" + connectString + zkLedgersRootPath;
+    }
+
+    @Override
+    public String getMetadataServiceUri(String zkLedgersRootPath, String type) 
{
+        return "zk+" + type + "://" + connectString + zkLedgersRootPath;
+    }
+
+    @Override
+    public void startCluster() throws Exception {
+        // create a ZooKeeper server(dataDir, dataLogDir, port)
+        LOG.debug("Running ZK server");
+        ClientBase.setupTestEnv();
+        zkTmpDir = IOUtils.createTempDir("zookeeper", "test");
+
+        // start the server and client.
+        restartCluster();
+
+        // create default bk ensemble
+        createBKEnsemble(ledgersRootPath);
+    }
+
+    @Override
+    public void createBKEnsemble(String ledgersPath) throws KeeperException, 
InterruptedException {
+        int last = ledgersPath.lastIndexOf('/');
+        if (last > 0) {
+            String pathToCreate = ledgersPath.substring(0, last);
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            if (zkc.exists(pathToCreate, false) == null) {
+                ZkUtils.asyncCreateFullPathOptimistic(zkc,
+                        pathToCreate,
+                        new byte[0],
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT, (i, s, o, s1) -> {
+                            future.complete(null);
+                        }, null);
+            }
+            future.join();
+        }
+
+        ZooKeeperCluster.super.createBKEnsemble(ledgersPath);
+    }
+    @Override
+    public void restartCluster() throws Exception {
+        zks = new ZooKeeperServer(zkTmpDir, zkTmpDir,
+                ZooKeeperServer.DEFAULT_TICK_TIME);
+        serverFactory = new NIOServerCnxnFactory();
+        serverFactory.configure(zkaddr, 100);
+        serverFactory.startup(zks);
+
+        if (0 == zooKeeperPort) {
+            zooKeeperPort = serverFactory.getLocalPort();
+            zkaddr = new 
InetSocketAddress(zkaddr.getAddress().getHostAddress(), zooKeeperPort);
+            connectString = zkaddr.getAddress().getHostAddress() + ":" + 
zooKeeperPort;
+        }
+
+        boolean b = ClientBase.waitForServerUp(getZooKeeperConnectString(),
+                ClientBase.CONNECTION_TIMEOUT);
+        LOG.debug("Server up: " + b);
+
+        // create a zookeeper client
+        LOG.debug("Instantiate ZK Client");
+        zkc = ZooKeeperClient.newBuilder()
+                .connectString(getZooKeeperConnectString())
+                .sessionTimeoutMs(10000)
+                .build();
+    }
+
+    @Override
+    public void sleepCluster(final int time,
+                             final TimeUnit timeUnit,
+                             final CountDownLatch l)
+            throws InterruptedException, IOException {
+        Thread[] allthreads = new Thread[Thread.activeCount()];
+        Thread.enumerate(allthreads);
+        for (final Thread t : allthreads) {
+            if (t.getName().contains("SyncThread:0")) {
+                Thread sleeper = new Thread() {
+                    @SuppressWarnings("deprecation")
+                    public void run() {
+                        try {
+                            t.suspend();
+                            l.countDown();
+                            timeUnit.sleep(time);
+                            t.resume();
+                        } catch (Exception e) {
+                            LOG.error("Error suspending thread", e);
+                        }
+                    }
+                };
+                sleeper.start();
+                return;
+            }
+        }
+        throw new IOException("ZooKeeper thread not found");
+    }
+
+    @Override
+    public void stopCluster() throws Exception {
+        if (zkc != null) {
+            zkc.close();
+        }
+
+        // shutdown ZK server
+        if (serverFactory != null) {
+            serverFactory.shutdown();
+            
assertTrue(ClientBase.waitForServerDown(getZooKeeperConnectString(), 
ClientBase.CONNECTION_TIMEOUT),
+                    "waiting for server down");
+        }
+        if (zks != null) {
+            zks.getTxnLogFactory().close();
+        }
+    }
+
+    @Override
+    public void killCluster() throws Exception {
+        stopCluster();
+        FileUtils.deleteDirectory(zkTmpDir);
+    }
+}
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 40c2041d4e6..43db5ad4ba8 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -70,7 +70,6 @@ import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.MetadataBookieDriver;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.metastore.InMemoryMetaStore;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -486,7 +485,7 @@ public abstract class BookKeeperClusterTestCase {
     public ServerConfiguration killBookieAndWaitForZK(int index) throws 
Exception {
         ServerTester tester = servers.get(index); // IKTODO: this method is 
awful
         ServerConfiguration ret = killBookie(index);
-        while 
(zkc.exists(ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf) + "/" + 
AVAILABLE_NODE + "/"
+        while (zkc.exists("/ledgers/" + AVAILABLE_NODE + "/"
                 + tester.getServer().getBookieId().toString(), false) != null) 
{
             Thread.sleep(500);
         }
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index d023427e3be..dd33c2c4532 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -52,7 +52,6 @@ public class BookKeeperTestClient extends BookKeeper {
             throws IOException, InterruptedException, BKException {
         super(conf, zkc, null, new UnpooledByteBufAllocator(false),
                 NullStatsLogger.INSTANCE, null, null, null);
-        this.statsProvider = statsProvider;
     }
 
     public BookKeeperTestClient(ClientConfiguration conf)


Reply via email to