This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 67ae209e995017fe41ac6a9c998aa7e63576c837
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Feb 17 18:54:43 2025 +0200

    [fix][meta] Fix ephemeral handling of ZK nodes and fix MockZooKeeper 
ephemeral and ZK stat handling (#23988)
    
    (cherry picked from commit df5197212e8806c7d1907dedfcdfd9e40a4f0ea5)
---
 .../broker/loadbalance/impl/LoadManagerShared.java |    4 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |    2 +-
 .../pulsar/broker/web/PulsarWebResource.java       |    2 +-
 .../pulsar/common/naming/NamespaceBundles.java     |    8 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   27 +-
 .../broker/testcontext/PulsarTestContext.java      |   93 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |  113 +-
 .../pulsar/client/api/ProducerConsumerBase.java    |    2 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |   14 +-
 .../replication/BookKeeperClusterTestCase.java     |    4 +
 .../pulsar/metadata/BaseMetadataStoreTest.java     |  124 +-
 .../org/apache/pulsar/metadata/CounterTest.java    |    3 +-
 .../apache/pulsar/metadata/MetadataCacheTest.java  |   29 +-
 .../pulsar/metadata/MetadataStoreExtendedTest.java |    6 +-
 .../apache/pulsar/metadata/MetadataStoreTest.java  |   28 +-
 .../MockZooKeeperMetadataStoreProvider.java        |   49 +
 .../bookkeeper/LedgerManagerIteratorTest.java      |   18 +-
 .../LedgerUnderreplicationManagerTest.java         |    8 +-
 .../bookkeeper/PulsarLedgerIdGeneratorTest.java    |    2 +-
 .../impl/MetadataStoreFactoryImplTest.java         |   21 +-
 .../java/org/apache/zookeeper/MockZooKeeper.java   | 1199 ++++++++++----------
 .../org/apache/zookeeper/MockZooKeeperSession.java |  201 +++-
 22 files changed, 1202 insertions(+), 755 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 3d627db6cfa..db2122331a5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -305,7 +305,7 @@ public class LoadManagerShared {
     public static String getBundleRangeFromBundleName(String bundleName) {
         // the bundle format is 
property/cluster/namespace/0x00000000_0xFFFFFFFF
         int pos = bundleName.lastIndexOf("/");
-        checkArgument(pos != -1);
+        checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName);
         return bundleName.substring(pos + 1);
     }
 
@@ -313,7 +313,7 @@ public class LoadManagerShared {
     public static String getNamespaceNameFromBundleName(String bundleName) {
         // the bundle format is 
property/cluster/namespace/0x00000000_0xFFFFFFFF
         int pos = bundleName.lastIndexOf('/');
-        checkArgument(pos != -1);
+        checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName);
         return bundleName.substring(0, pos);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 52706a1b02b..2c74e0ffca2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -614,7 +614,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         for (String bundle : bundleData.keySet()) {
             if (!activeBundles.contains(bundle)){
                 bundleData.remove(bundle);
-                if (pulsar.getLeaderElectionService().isLeader()){
+                if (pulsar.getLeaderElectionService() != null && 
pulsar.getLeaderElectionService().isLeader()){
                     deleteBundleDataFromMetadataStore(bundle);
                 }
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index f9b1f0d08c0..914ca0b82bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -1016,7 +1016,7 @@ public abstract class PulsarWebResource {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return true;
         }
-        return  pulsar.getLeaderElectionService().isLeader();
+        return pulsar.getLeaderElectionService() != null && 
pulsar.getLeaderElectionService().isLeader();
     }
 
     public void validateTenantOperation(String tenant, TenantOperation 
operation) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index cb7e135662c..900aa5336ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -113,9 +113,11 @@ public class NamespaceBundles {
 
     public void validateBundle(NamespaceBundle nsBundle) throws Exception {
         int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint());
-        checkArgument(idx >= 0, "Cannot find bundle in the bundles list");
-        
checkArgument(nsBundle.getUpperEndpoint().equals(bundles.get(idx).getUpperEndpoint()),
-                "Invalid upper boundary for bundle");
+        checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", 
nsBundle);
+        NamespaceBundle foundBundle = bundles.get(idx);
+        Long upperEndpoint = foundBundle.getUpperEndpoint();
+        checkArgument(nsBundle.getUpperEndpoint().equals(upperEndpoint),
+                "Invalid upper boundary for bundle %s. Expected upper boundary 
of %s", nsBundle, foundBundle);
     }
 
     public NamespaceBundle getFullBundle() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 29d350bbf90..377b183efda 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -141,6 +141,9 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
 
     protected boolean enableBrokerInterceptor = false;
 
+    // Set to true in test's constructor to use a real Zookeeper (TestZKServer)
+    protected boolean useTestZookeeper;
+
     public MockedPulsarServiceBaseTest() {
         resetConfig();
     }
@@ -309,7 +312,14 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
      * @throws Exception if an error occurs
      */
     protected void restartBroker() throws Exception {
+        restartBroker(null);
+    }
+
+    protected void restartBroker(Consumer<ServiceConfiguration> 
configurationChanger) throws Exception {
         stopBroker();
+        if (configurationChanger != null) {
+            configurationChanger.accept(conf);
+        }
         startBroker();
     }
 
@@ -400,7 +410,6 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         PulsarTestContext.Builder builder = PulsarTestContext.builder()
                 .spyByDefault()
                 .config(conf)
-                .withMockZookeeper(true)
                 .pulsarServiceCustomizer(pulsarService -> {
                     try {
                         beforePulsarStart(pulsarService);
@@ -409,9 +418,25 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
                     }
                 })
                 .brokerServiceCustomizer(this::customizeNewBrokerService);
+        configureMetadataStores(builder);
         return builder;
     }
 
+    /**
+     * Configures the metadata stores for the PulsarTestContext.Builder 
instance.
+     * Set useTestZookeeper to true in the test's constructor to use 
TestZKServer which is a real ZooKeeper
+     * implementation.
+     *
+     * @param builder the PulsarTestContext.Builder instance to configure
+     */
+    protected void configureMetadataStores(PulsarTestContext.Builder builder) {
+        if (useTestZookeeper) {
+            builder.withTestZookeeper();
+        } else {
+            builder.withMockZookeeper(true);
+        }
+    }
+
     /**
      * This method can be used in test classes for creating additional 
PulsarTestContext instances
      * that share the same mock ZooKeeper and BookKeeper instances as the main 
PulsarTestContext instance.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index d62600829da..feb0be3d947 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -19,12 +19,10 @@
 
 package org.apache.pulsar.broker.testcontext;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.EventLoopGroup;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -33,6 +31,7 @@ import lombok.AccessLevel;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.Singular;
+import lombok.SneakyThrows;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -56,6 +55,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.TestZKServer;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -63,9 +63,11 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.MockZooKeeperSession;
-import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.jetbrains.annotations.NotNull;
 import org.mockito.Mockito;
 import org.mockito.internal.util.MockUtil;
@@ -150,6 +152,10 @@ public class PulsarTestContext implements AutoCloseable {
 
     private final MockZooKeeper mockZooKeeperGlobal;
 
+    private final TestZKServer testZKServer;
+
+    private final TestZKServer testZKServerGlobal;
+
     private final SpyConfig spyConfig;
 
     private final boolean startable;
@@ -377,6 +383,11 @@ public class PulsarTestContext implements AutoCloseable {
                 if (otherContext.getMockZooKeeperGlobal() != null) {
                     mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal());
                 }
+            } else if (otherContext.getTestZKServer() != null) {
+                testZKServer(otherContext.getTestZKServer());
+                if (otherContext.getTestZKServerGlobal() != null) {
+                    testZKServerGlobal(otherContext.getTestZKServerGlobal());
+                }
             } else {
                 
localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(),
                         MetadataStoreExtended.class
@@ -436,17 +447,56 @@ public class PulsarTestContext implements AutoCloseable {
         }
 
         private MockZooKeeper createMockZooKeeper() throws Exception {
-            MockZooKeeper zk = 
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
-            List<ACL> dummyAclList = new ArrayList<>(0);
+            MockZooKeeper zk = MockZooKeeper.newInstance();
+            initializeZookeeper(zk);
+            registerCloseable(zk::shutdown);
+            return zk;
+        }
 
+        private static void initializeZookeeper(ZooKeeper zk) throws 
KeeperException, InterruptedException {
             ZkUtils.createFullPathOptimistic(zk, 
"/ledgers/available/192.168.1.1:" + 5000,
-                    "".getBytes(StandardCharsets.UTF_8), dummyAclList, 
CreateMode.PERSISTENT);
+                    "".getBytes(StandardCharsets.UTF_8), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-            zk.create("/ledgers/LAYOUT", 
"1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
+            zk.create("/ledgers/LAYOUT", 
"1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
+        }
 
-            registerCloseable(zk::shutdown);
-            return zk;
+        /**
+         * Configure this PulsarTestContext to use a test ZooKeeper instance 
which is
+         * shared for both the local and configuration metadata stores.
+         *
+         * @return the builder
+         */
+        public Builder withTestZookeeper() {
+            return withTestZookeeper(false);
+        }
+
+        /**
+         * Configure this PulsarTestContext to use a test ZooKeeper instance.
+         *
+         * @param useSeparateGlobalZk if true, the global (configuration) 
zookeeper will be a separate instance
+         * @return the builder
+         */
+        public Builder withTestZookeeper(boolean useSeparateGlobalZk) {
+            try {
+                testZKServer(createTestZookeeper());
+                if (useSeparateGlobalZk) {
+                    testZKServerGlobal(createTestZookeeper());
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return this;
+        }
+
+        private TestZKServer createTestZookeeper() throws Exception {
+            TestZKServer testZKServer = new TestZKServer();
+            try (ZooKeeper zkc = new 
ZooKeeper(testZKServer.getConnectionString(), 5000, event -> {
+            })) {
+                initializeZookeeper(zkc);
+            }
+            registerCloseable(testZKServer);
+            return testZKServer;
         }
 
         /**
@@ -628,6 +678,20 @@ public class PulsarTestContext implements AutoCloseable {
                             
configurationMetadataStore(mockZookeeperMetadataStore);
                         }
                     }
+                } else if (super.testZKServer != null) {
+                    MetadataStoreExtended testZookeeperMetadataStore =
+                            
createTestZookeeperMetadataStore(super.testZKServer, 
MetadataStoreConfig.METADATA_STORE);
+                    if (super.localMetadataStore == null) {
+                        localMetadataStore(testZookeeperMetadataStore);
+                    }
+                    if (super.configurationMetadataStore == null) {
+                        if (super.testZKServerGlobal != null) {
+                            
configurationMetadataStore(createTestZookeeperMetadataStore(super.testZKServerGlobal,
+                                    
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
+                        } else {
+                            
configurationMetadataStore(testZookeeperMetadataStore);
+                        }
+                    }
                 } else {
                     try {
                         MetadataStoreExtended store = 
MetadataStoreFactoryImpl.createExtended("memory:local",
@@ -672,6 +736,17 @@ public class PulsarTestContext implements AutoCloseable {
             return nonClosingProxy;
         }
 
+        @SneakyThrows
+        private MetadataStoreExtended 
createTestZookeeperMetadataStore(TestZKServer zkServer,
+                                                                       String 
metadataStoreName) {
+            MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + 
zkServer.getConnectionString(),
+                    
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
+            registerCloseable(store);
+            MetadataStoreExtended nonClosingProxy =
+                    NonClosingProxyHandler.createNonClosingProxy(store, 
MetadataStoreExtended.class);
+            return nonClosingProxy;
+        }
+
         protected abstract void initializePulsarServices(SpyConfig spyConfig, 
Builder builder);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index f132aef96bd..4ceb4d20e53 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -88,6 +88,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PoliciesUtil;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
@@ -108,15 +109,41 @@ import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.ITest;
+import org.testng.SkipException;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class BrokerServiceLookupTest extends ProducerConsumerBase {
+public class BrokerServiceLookupTest extends ProducerConsumerBase implements 
ITest {
     private static final Logger log = 
LoggerFactory.getLogger(BrokerServiceLookupTest.class);
+    private String testName;
+
+    @DataProvider
+    private static Object[] booleanValues() {
+        return new Object[]{ true, false };
+    }
+
+    @Factory(dataProvider = "booleanValues")
+    public BrokerServiceLookupTest(boolean useTestZookeeper) {
+        // when set to true, TestZKServer is used which is a real ZooKeeper 
implementation
+        this.useTestZookeeper = useTestZookeeper;
+    }
+
+    @Override
+    public String getTestName() {
+        return testName;
+    }
 
     @BeforeMethod
+    public void applyTestName(Method method) {
+        testName = method.getName() + " with " + (useTestZookeeper ? 
"TestZKServer" : "MockZooKeeper");
+    }
+
+    @BeforeMethod(dependsOnMethods = "setTestMethodName")
     @Override
     protected void setup() throws Exception {
         conf.setDefaultNumberOfNamespaceBundles(1);
@@ -125,10 +152,43 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         producerBaseSetup();
     }
 
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        switch (methodName) {
+            case "testMultipleBrokerDifferentClusterLookup" -> {
+                conf.setAuthenticationEnabled(true);
+            }
+            case "testWebserviceServiceTls" -> {
+                // broker1 with tls enabled
+                conf.setBrokerServicePortTls(Optional.of(0));
+                conf.setWebServicePortTls(Optional.of(0));
+                conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+                conf.setTlsRequireTrustedClientCertOnConnect(true);
+                conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
+                conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
+                conf.setNumExecutorThreadPoolSize(5);
+                // Not in use, and because TLS is not configured, it will fail 
to start
+                conf.setSystemTopicEnabled(false);
+            }
+            case "testSkipSplitBundleIfOnlyOneBroker" -> {
+                conf.setDefaultNumberOfNamespaceBundles(1);
+                conf.setLoadBalancerNamespaceBundleMaxTopics(1);
+                
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+            }
+            case "testPartitionedMetadataWithDeprecatedVersion" -> {
+                conf.setBrokerServicePortTls(Optional.empty());
+                conf.setWebServicePortTls(Optional.empty());
+                conf.setClientLibraryVersionCheckEnabled(true);
+            }
+        }
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         internalCleanup();
+        testName = null;
     }
 
     /**
@@ -214,9 +274,11 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
     @Test
     public void testConcurrentWriteBrokerData() throws Exception {
         Map<String, NamespaceBundleStats> map = new ConcurrentHashMap<>();
+        List<String> boundaries = PoliciesUtil.getBundles(100).getBoundaries();
         for (int i = 0; i < 100; i++) {
-            map.put("key"+ i, new NamespaceBundleStats());
+            map.put("my-property/my-ns/" + boundaries.get(i), new 
NamespaceBundleStats());
         }
+        BrokerService originalBrokerService = pulsar.getBrokerService();
         BrokerService brokerService = mock(BrokerService.class);
         doReturn(brokerService).when(pulsar).getBrokerService();
         doReturn(map).when(brokerService).getBundleStats();
@@ -247,6 +309,8 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         for (Future<?> future : list) {
             future.get();
         }
+        // allow proper shutdown so that resources aren't leaked
+        doReturn(originalBrokerService).when(pulsar).getBrokerService();
     }
 
     /**
@@ -294,12 +358,6 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         @Cleanup
         PulsarClient pulsarClient2 = 
PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build();
 
-        // enable authorization: so, broker can validate cluster and redirect 
if finds different cluster
-        pulsar.getConfiguration().setAuthorizationEnabled(true);
-        // restart broker with authorization enabled: it initialize 
AuthorizationService
-        stopBroker();
-        startBroker();
-
         LoadManager loadManager2 = spy(pulsar2.getLoadManager().get());
         Field loadManagerField = 
NamespaceService.class.getDeclaredField("loadManager");
         loadManagerField.setAccessible(true);
@@ -336,10 +394,6 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         consumer.acknowledgeCumulative(msg);
         consumer.close();
         producer.close();
-
-        // disable authorization
-        pulsar.getConfiguration().setAuthorizationEnabled(false);
-        loadManager2 = null;
     }
 
     /**
@@ -457,18 +511,6 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         PulsarTestContext pulsarTestContext2 = 
createAdditionalPulsarTestContext(conf2);
         PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
 
-        // restart broker1 with tls enabled
-        conf.setBrokerServicePortTls(Optional.of(0));
-        conf.setWebServicePortTls(Optional.of(0));
-        conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
-        conf.setTlsRequireTrustedClientCertOnConnect(true);
-        conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
-        conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
-        conf.setNumExecutorThreadPoolSize(5);
-        // Not in use, and because TLS is not configured, it will fail to start
-        conf.setSystemTopicEnabled(false);
-        stopBroker();
-        startBroker();
         pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
         pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();
 
@@ -672,11 +714,6 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
             conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
             conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
 
-            // configure broker-1 with ModularLoadManager
-            stopBroker();
-            
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
-            startBroker();
-
             @Cleanup
             PulsarTestContext pulsarTestContext2 = 
createAdditionalPulsarTestContext(conf2);
             PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
@@ -794,12 +831,6 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/tp_");
         final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/tp_");
         try {
-            // configure broker with ModularLoadManager.
-            stopBroker();
-            conf.setDefaultNumberOfNamespaceBundles(1);
-            conf.setLoadBalancerNamespaceBundleMaxTopics(1);
-            
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
-            startBroker();
             final ModularLoadManagerWrapper modularLoadManagerWrapper =
                     (ModularLoadManagerWrapper) pulsar.getLoadManager().get();
             final ModularLoadManagerImpl modularLoadManager =
@@ -952,12 +983,6 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         admin.namespaces().createNamespace(property + "/" + cluster + "/" + 
namespace);
         admin.topics().createPartitionedTopic(dest.toString(), 
totalPartitions);
 
-        stopBroker();
-        conf.setBrokerServicePortTls(Optional.empty());
-        conf.setWebServicePortTls(Optional.empty());
-        conf.setClientLibraryVersionCheckEnabled(true);
-        startBroker();
-
         URI brokerServiceUrl = new URI(pulsar.getSafeWebServiceAddress());
 
         URL url = brokerServiceUrl.toURL();
@@ -1116,6 +1141,9 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
 
     @Test
     public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() 
throws Exception {
+        if (useTestZookeeper) {
+            throw new SkipException("This test case depends on MockZooKeeper");
+        }
         String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         admin.topics().createNonPartitionedTopic(tpName);
         PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
@@ -1211,7 +1239,8 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = 30000)
+    // TODO: This test is disabled since it's invalid. The test fails for both 
TestZKServer and MockZooKeeper.
+    @Test(timeOut = 30000, enabled = false)
     public void 
testLookupConnectionNotCloseIfFailedToAcquireOwnershipOfBundle() throws 
Exception {
         String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         admin.topics().createNonPartitionedTopic(tpName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index ef070250ca1..295120bf369 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -40,7 +40,7 @@ public abstract class ProducerConsumerBase extends 
MockedPulsarServiceBaseTest {
     protected String methodName;
 
     @BeforeMethod(alwaysRun = true)
-    public void beforeMethod(Method m) throws Exception {
+    public void setTestMethodName(Method m) throws Exception {
         methodName = m.getName();
     }
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 78e9980bc21..21128c77477 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -71,9 +71,10 @@ import org.apache.zookeeper.client.ConnectStringParser;
 @Slf4j
 public class ZKMetadataStore extends AbstractBatchedMetadataStore
         implements MetadataStoreExtended, MetadataStoreLifecycle {
-
     public static final String ZK_SCHEME = "zk";
     public static final String ZK_SCHEME_IDENTIFIER = "zk:";
+    // ephemeralOwner value for persistent nodes
+    private static final long NOT_EPHEMERAL = 0L;
 
     private final String zkConnectString;
     private final String rootPath;
@@ -128,12 +129,17 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
     @VisibleForTesting
     @SneakyThrows
     public ZKMetadataStore(ZooKeeper zkc, MetadataStoreConfig config) {
-        super(config);
+        this(zkc, config, false);
+    }
 
+    @VisibleForTesting
+    @SneakyThrows
+    public ZKMetadataStore(ZooKeeper zkc, MetadataStoreConfig config, boolean 
isZkManaged) {
+        super(config);
         this.zkConnectString = null;
         this.rootPath = null;
         this.metadataStoreConfig = null;
-        this.isZkManaged = false;
+        this.isZkManaged = isZkManaged;
         this.zkc = zkc;
         this.sessionWatcher = new ZKSessionWatcher(zkc, 
this::receivedSessionEvent);
         zkc.addWatch("/", this::handleWatchEvent, 
AddWatchMode.PERSISTENT_RECURSIVE);
@@ -477,7 +483,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
     private Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) {
         return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), 
zkStat.getMtime(),
-                zkStat.getEphemeralOwner() != -1,
+                zkStat.getEphemeralOwner() != NOT_EPHEMERAL,
                 zkStat.getEphemeralOwner() == zkc.getSessionId());
     }
 
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
index 9a8e3ef5a2d..1e861bbf88c 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
@@ -207,6 +207,7 @@ public abstract class BookKeeperClusterTestCase {
         try {
             // cleanup for metrics.
             metadataStore.close();
+            metadataStore = null;
             stopZKCluster();
         } catch (Exception e) {
             LOG.error("Got Exception while trying to stop ZKCluster", e);
@@ -236,6 +237,9 @@ public abstract class BookKeeperClusterTestCase {
     protected void startZKCluster() throws Exception {
         zkUtil.startCluster();
         zkc = zkUtil.getZooKeeperClient();
+        if (metadataStore != null) {
+            metadataStore.close();
+        }
         metadataStore = new FaultInjectionMetadataStore(
                 
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
                 MetadataStoreConfig.builder().metadataStoreName("metastore-" + 
getClass().getSimpleName()).build()));
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index c419f612735..c4bd1959acc 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -24,11 +24,18 @@ import io.etcd.jetcd.launcher.EtcdCluster;
 import io.etcd.jetcd.test.EtcdClusterExtension;
 import java.io.File;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletionException;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.assertj.core.util.Files;
 import org.testng.annotations.AfterClass;
@@ -36,19 +43,45 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
 public abstract class BaseMetadataStoreTest extends TestRetrySupport {
+    // to debug specific implementations, set the TEST_METADATA_PROVIDERS 
environment variable
+    // or temporarily hard code this value in the test class before running 
tests in the IDE
+    // supported values are ZooKeeper,Memory,RocksDB,Etcd,Oxia,MockZooKeeper
+    private static final String TEST_METADATA_PROVIDERS = 
System.getenv("TEST_METADATA_PROVIDERS");
+    private static String originalMetadatastoreProvidersPropertyValue;
     protected TestZKServer zks;
     protected EtcdCluster etcdCluster;
-
+    private String mockZkUrl;
+    // reference to keep the MockZooKeeper instance alive in 
MockZookeeperMetadataStoreProvider
+    private MetadataStore mockZkStoreRef;
+    private String zksConnectionString;
+    private String memoryConnectionString;
+    private String rocksdbConnectionString;
+    private File rocksDbDirectory;
+    private boolean running;
     @BeforeClass(alwaysRun = true)
     @Override
     public void setup() throws Exception {
+        running = true;
         incrementSetupNumber();
         zks = new TestZKServer();
+        zksConnectionString = zks.getConnectionString();
+        memoryConnectionString = "memory:" + UUID.randomUUID();
+        rocksDbDirectory = Files.newTemporaryFolder().getAbsoluteFile();
+        rocksdbConnectionString = "rocksdb:" + rocksDbDirectory;
+        originalMetadatastoreProvidersPropertyValue =
+                
System.getProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY);
+        // register MockZooKeeperMetadataStoreProvider
+        
System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY,
+                MockZooKeeperMetadataStoreProvider.class.getName());
+        mockZkUrl = "mock-zk:" + UUID.randomUUID();
+        // create a reference in MockZooKeeperMetadataStoreProvider to keep 
the MockZooKeeper instance alive
+        mockZkStoreRef = MetadataStoreFactory.create(mockZkUrl, 
MetadataStoreConfig.builder().build());
     }
 
     @AfterClass(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
+        running = false;
         markCurrentSetupNumberCleaned();
         if (zks != null) {
             zks.close();
@@ -59,30 +92,71 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
             etcdCluster.close();
             etcdCluster = null;
         }
-    }
 
-    private static String createTempFolder() {
-        File temp = Files.newTemporaryFolder();
-        temp.deleteOnExit();
-        return temp.getAbsolutePath();
+        if (mockZkStoreRef != null) {
+            mockZkStoreRef.close();
+            mockZkStoreRef = null;
+            mockZkUrl = null;
+        }
+
+        if (rocksDbDirectory != null) {
+            Files.delete(rocksDbDirectory);
+            rocksDbDirectory = null;
+        }
+
+        if (originalMetadatastoreProvidersPropertyValue != null) {
+            
System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY,
+                    originalMetadatastoreProvidersPropertyValue);
+        } else {
+            
System.clearProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY);
+        }
     }
 
     @DataProvider(name = "impl")
     public Object[][] implementations() {
+        // If the environment variable TEST_METADATA_PROVIDERS is set, only 
run the specified implementations
+        if (StringUtils.isNotBlank(TEST_METADATA_PROVIDERS)) {
+            return filterImplementations(TEST_METADATA_PROVIDERS.split(","));
+        }
+        return allImplementations();
+    }
+
+    private Object[][] allImplementations() {
         // A Supplier<String> must be used for the Zookeeper connection string 
parameter. The retried test run will
         // use the same arguments as the failed attempt.
         // The Zookeeper test server gets restarted by TestRetrySupport before 
the retry.
         // The new connection string won't be available to the test method 
unless a
         // Supplier<String> lambda is used for providing the value.
         return new Object[][]{
-                {"ZooKeeper", stringSupplier(() -> zks.getConnectionString())},
-                {"Memory", stringSupplier(() -> "memory:" + 
UUID.randomUUID())},
-                {"RocksDB", stringSupplier(() -> "rocksdb:" + 
createTempFolder())},
+                {"ZooKeeper", stringSupplier(() -> zksConnectionString)},
+                {"Memory", stringSupplier(() -> memoryConnectionString)},
+                {"RocksDB", stringSupplier(() -> rocksdbConnectionString)},
                 {"Etcd", stringSupplier(() -> "etcd:" + 
getEtcdClusterConnectString())},
+                {"MockZooKeeper", stringSupplier(() -> mockZkUrl)},
         };
     }
 
+    @DataProvider(name = "distributedImpl")
+    public Object[][] distributedImplementations() {
+        return filterImplementations("ZooKeeper", "Etcd");
+    }
+
+    @DataProvider(name = "zkImpls")
+    public Object[][] zkImplementations() {
+        return filterImplementations("ZooKeeper", "MockZooKeeper");
+    }
+
+    protected Object[][] filterImplementations(String... providers) {
+        Set<String> providersSet = Set.of(providers);
+        return Arrays.stream(allImplementations())
+                .filter(impl -> providersSet.contains(impl[0]))
+                .toArray(Object[][]::new);
+    }
+
     private synchronized String getEtcdClusterConnectString() {
+        if (!running) {
+            return null;
+        }
         if (etcdCluster == null) {
             etcdCluster = 
EtcdClusterExtension.builder().withClusterName("test").withNodes(1).withSsl(false).build()
                     .cluster();
@@ -92,7 +166,26 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
     }
 
     public static Supplier<String> stringSupplier(Supplier<String> supplier) {
-        return supplier;
+        return new StringSupplier(supplier);
+    }
+
+    // Implements toString() so that the test name is more descriptive
+    private static class StringSupplier implements Supplier<String> {
+        private final Supplier<String> supplier;
+
+        public StringSupplier(Supplier<String> supplier) {
+            this.supplier = supplier;
+        }
+
+        @Override
+        public String get() {
+            return supplier.get();
+        }
+
+        @Override
+        public String toString() {
+            return get();
+        }
     }
 
     protected String newKey() {
@@ -138,4 +231,15 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
         }
         return false;
     }
+
+    /**
+     * Delete all the empty container nodes
+     * @param provider the metadata store provider
+     * @throws Exception
+     */
+    protected void maybeTriggerDeletingEmptyContainers(String provider) throws 
Exception {
+        if ("ZooKeeper".equals(provider) && zks != null) {
+            zks.checkContainers();
+        }
+    }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
index c5b4012f0c8..bd068539cc5 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
@@ -70,6 +70,7 @@ public class CounterTest extends BaseMetadataStoreTest {
             return;
         }
         String metadataUrl = urlSupplier.get();
+        @Cleanup
         MetadataStoreExtended store1 = 
MetadataStoreExtended.create(metadataUrl, 
MetadataStoreConfig.builder().build());
 
         CoordinationService cs1 = new CoordinationServiceImpl(store1);
@@ -85,7 +86,7 @@ public class CounterTest extends BaseMetadataStoreTest {
         store1.close();
 
         // Delete all the empty container nodes
-        zks.checkContainers();
+        maybeTriggerDeletingEmptyContainers(provider);
 
         @Cleanup
         MetadataStoreExtended store2 = 
MetadataStoreExtended.create(metadataUrl, 
MetadataStoreConfig.builder().build());
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index 591c0a23a9b..14fe3d35325 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -69,7 +69,6 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.awaitility.Awaitility;
 import org.mockito.stubbing.Answer;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -111,14 +110,7 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         }
     }
 
-    @DataProvider(name = "zk")
-    public Object[][] zkimplementations() {
-        return new Object[][] {
-            { "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) },
-        };
-    }
-
-    @Test(dataProvider = "zk")
+    @Test(dataProvider = "zkImpls")
     public void crossStoreAddDelete(String provider, Supplier<String> 
urlSupplier) throws Exception {
         @Cleanup
         MetadataStore store1 = MetadataStoreFactory.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
@@ -183,7 +175,7 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         });
     }
 
-    @Test(dataProvider = "zk")
+    @Test(dataProvider = "zkImpls")
     public void crossStoreUpdates(String provider, Supplier<String> 
urlSupplier) throws Exception {
         String testName = "cross store updates";
         @Cleanup
@@ -493,11 +485,10 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
      *
      * @throws Exception
      */
-    @Test
-    public void readModifyUpdateBadVersionRetry() throws Exception {
-        String url = zks.getConnectionString();
+    @Test(dataProvider = "zkImpls")
+    public void readModifyUpdateBadVersionRetry(String provider, 
Supplier<String> urlSupplier) throws Exception {
         @Cleanup
-        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
 
         MetadataCache<MyClass> cache = store.getMetadataCache(MyClass.class);
 
@@ -511,7 +502,8 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         final var sourceStores = new ArrayList<MetadataStore>();
 
         for (int i = 0; i < 20; i++) {
-            final var sourceStore = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+            final var sourceStore =
+                    MetadataStoreFactory.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
             sourceStores.add(sourceStore);
             final var objCache = sourceStore.getMetadataCache(MyClass.class);
             futures.add(objCache.readModifyUpdate(key1, v -> new MyClass(v.a, 
v.b + 1)));
@@ -522,11 +514,10 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         }
     }
 
-    @Test
-    public void readModifyUpdateOrCreateRetryTimeout() throws Exception {
-        String url = zks.getConnectionString();
+    @Test(dataProvider = "zkImpls")
+    public void readModifyUpdateOrCreateRetryTimeout(String provider, 
Supplier<String> urlSupplier) throws Exception {
         @Cleanup
-        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
 
         MetadataCache<MyClass> cache = store.getMetadataCache(MyClass.class, 
MetadataCacheConfig.builder()
                 .retryBackoff(new BackoffBuilder()
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
index a4c937611fd..30fbd9b836e 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -70,18 +71,19 @@ public class MetadataStoreExtendedTest extends 
BaseMetadataStoreTest {
     @Test(dataProvider = "impl")
     public void testPersistentOrEphemeralPut(String provider, Supplier<String> 
urlSupplier) throws Exception {
         final String key1 = newKey();
+        @Cleanup
         MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
         store.put(key1, "value-1".getBytes(), Optional.empty(), 
EnumSet.noneOf(CreateOption.class)).join();
         var value = store.get(key1).join().get();
         assertEquals(value.getValue(), "value-1".getBytes());
-        // assertFalse(value.getStat().isEphemeral()); // Todo : fix 
zkStat.getEphemeralOwner() != 0 from test zk
+        assertFalse(value.getStat().isEphemeral());
         assertTrue(value.getStat().isFirstVersion());
         var version = value.getStat().getVersion();
 
         store.put(key1, "value-2".getBytes(), Optional.empty(), 
EnumSet.noneOf(CreateOption.class)).join();
         value = store.get(key1).join().get();
         assertEquals(value.getValue(), "value-2".getBytes());
-       //assertFalse(value.getStat().isEphemeral());  // Todo : fix 
zkStat.getEphemeralOwner() != 0 from test zk
+        assertFalse(value.getStat().isEphemeral());
         assertEquals(value.getStat().getVersion(), version + 1);
 
         final String key2 = newKey();
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index c87e9bda436..7b60a6be581 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.metadata;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -54,6 +55,7 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.assertj.core.util.Lists;
 import org.awaitility.Awaitility;
+import org.testng.SkipException;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -97,7 +99,7 @@ public class MetadataStoreTest extends BaseMetadataStoreTest {
                 MetadataStoreConfig.builder().fsyncEnable(false).build());
 
         String data = "data";
-        String path = "/non-existing-key";
+        String path = "/concurrentPutTest";
         int concurrent = 50;
         List<CompletableFuture<Stat>> futureList = new ArrayList<>();
         for (int i = 0; i < concurrent; i++) {
@@ -400,6 +402,10 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
 
     @Test(dataProvider = "impl")
     public void testDeleteUnusedDirectories(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        if (provider.equals("MockZooKeeper")) {
+            throw new SkipException("MockZooKeeper does not support 
deleteUnusedDirectories");
+        }
+
         @Cleanup
         MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
                 MetadataStoreConfig.builder().fsyncEnable(false).build());
@@ -413,18 +419,18 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         store.delete(prefix + "/a1/b1/c1", Optional.empty()).join();
         store.delete(prefix + "/a1/b1/c2", Optional.empty()).join();
 
-        zks.checkContainers();
+        maybeTriggerDeletingEmptyContainers(provider);
         assertFalse(store.exists(prefix + "/a1/b1").join());
 
         store.delete(prefix + "/a1/b2/c1", Optional.empty()).join();
 
-        zks.checkContainers();
+        maybeTriggerDeletingEmptyContainers(provider);
         assertFalse(store.exists(prefix + "/a1/b2").join());
 
-        zks.checkContainers();
+        maybeTriggerDeletingEmptyContainers(provider);
         assertFalse(store.exists(prefix + "/a1").join());
 
-        zks.checkContainers();
+        maybeTriggerDeletingEmptyContainers(provider);
         assertFalse(store.exists(prefix).join());
     }
 
@@ -602,21 +608,25 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         store.put("/b/c/b/1", "value1".getBytes(StandardCharsets.UTF_8), 
Optional.empty()).join();
 
         List<String> subPaths = store.getChildren("/").get();
-        Set<String> expectedSet = "ZooKeeper".equals(provider) ? Set.of("a", 
"b", "zookeeper") : Set.of("a", "b");
+        Set<String> ignoredRootPaths = Set.of("zookeeper");
+        Set<String> expectedSet = Set.of("a", "b");
         for (String subPath : subPaths) {
-            assertTrue(expectedSet.contains(subPath));
+            if (ignoredRootPaths.contains(subPath)) {
+                continue;
+            }
+            assertThat(expectedSet).contains(subPath);
         }
 
         List<String> subPaths2 = store.getChildren("/a").get();
         Set<String> expectedSet2 = Set.of("a-1", "a-2");
         for (String subPath : subPaths2) {
-            assertTrue(expectedSet2.contains(subPath));
+            assertThat(expectedSet2).contains(subPath);
         }
 
         List<String> subPaths3 = store.getChildren("/b").get();
         Set<String> expectedSet3 = Set.of("c");
         for (String subPath : subPaths3) {
-            assertTrue(expectedSet3.contains(subPath));
+            assertThat(expectedSet3).contains(subPath);
         }
     }
 
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java
new file mode 100644
index 00000000000..994a97c2b10
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.MockZooKeeperSession;
+
+public class MockZooKeeperMetadataStoreProvider implements 
MetadataStoreProvider {
+    private static final String MOCK_ZK_SCHEME = "mock-zk";
+    private static final ConcurrentMap<String, MockZooKeeper> mockZooKeepers = 
new ConcurrentHashMap<>();
+
+    @Override
+    public String urlScheme() {
+        return MOCK_ZK_SCHEME;
+    }
+
+    @Override
+    public MetadataStore create(String metadataURL, MetadataStoreConfig 
metadataStoreConfig,
+                                boolean enableSessionWatcher) throws 
MetadataStoreException {
+        MockZooKeeper mockZooKeeper = 
mockZooKeepers.computeIfAbsent(metadataURL,
+                k -> MockZooKeeper.newInstance().registerCloseable(() -> 
mockZooKeepers.remove(k)));
+        MockZooKeeperSession mockZooKeeperSession = 
MockZooKeeperSession.newInstance(mockZooKeeper, true);
+        ZKMetadataStore zkMetadataStore = new 
ZKMetadataStore(mockZooKeeperSession, metadataStoreConfig, true);
+        return zkMetadataStore;
+    }
+}
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java
index b64cc964a99..bf67d0218b0 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java
@@ -373,7 +373,7 @@ public class LedgerManagerIteratorTest extends 
BaseMetadataStoreTest {
         assertEquals(ledgersReadAsync, ids, "Comparing LedgersIds read 
asynchronously");
     }
 
-    @Test(timeOut = 30000, dataProvider = "impl")
+    @Test(timeOut = 60000, dataProvider = "impl")
     public void checkConcurrentModifications(String provider, Supplier<String> 
urlSupplier) throws Throwable {
         @Cleanup
         MetadataStoreExtended store =
@@ -406,14 +406,16 @@ public class LedgerManagerIteratorTest extends 
BaseMetadataStoreTest {
         ExecutorService executor = Executors.newCachedThreadPool();
         final ConcurrentSkipListSet<Long> createdLedgers = new 
ConcurrentSkipListSet<>();
         for (int i = 0; i < numWriters; ++i) {
+            int writerIndex = i;
             Future<?> f = executor.submit(() -> {
                 @Cleanup
                 LedgerManager writerLM = new PulsarLedgerManager(store, 
ledgersRoot);
                 Random writerRNG = new Random(rng.nextLong());
-
+                log.info("Writer {} waiting", writerIndex);
                 latch.await();
-
+                log.info("Writer {} started", writerIndex);
                 while (MathUtils.elapsedNanos(start) < runtime) {
+                    log.info("Writer {} writing", writerIndex);
                     long candidate = 0;
                     do {
                         candidate = Math.abs(writerRNG.nextLong());
@@ -425,18 +427,22 @@ public class LedgerManagerIteratorTest extends 
BaseMetadataStoreTest {
                     createLedger(writerLM, candidate);
                     removeLedger(writerLM, candidate);
                 }
+                log.info("Writer {} finished", writerIndex);
                 return null;
             });
             futures.add(f);
         }
 
         for (int i = 0; i < numCheckers; ++i) {
+            int checkerIndex = i;
             Future<?> f = executor.submit(() -> {
                 @Cleanup
                 LedgerManager checkerLM = new PulsarLedgerManager(store, 
ledgersRoot);
+                log.info("Checker {} waiting", checkerIndex);
                 latch.await();
-
+                log.info("Checker {} started", checkerIndex);
                 while (MathUtils.elapsedNanos(start) < runtime) {
+                    log.info("Checker {} checking", checkerIndex);
                     LedgerRangeIterator lri = checkerLM.getLedgerRanges(0);
                     Set<Long> returnedIds = ledgerRangeToSet(lri);
                     for (long id : mustExist) {
@@ -448,15 +454,19 @@ public class LedgerManagerIteratorTest extends 
BaseMetadataStoreTest {
                         assertTrue(ledgersReadAsync.contains(id));
                     }
                 }
+                log.info("Checker {} finished", checkerIndex);
                 return null;
             });
             futures.add(f);
         }
 
         latch.countDown();
+        log.info("Waiting for futures");
         for (Future<?> f : futures) {
+            log.info("Waiting for future");
             f.get();
         }
+        log.info("Completed");
         executor.shutdownNow();
     }
 
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
index 0e9c781fb91..ac73491a81c 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
@@ -300,10 +300,10 @@ public class LedgerUnderreplicationManagerTest extends 
BaseMetadataStoreTest {
         assertEquals(l, lB.get(), "Should be the ledger I marked");
     }
 
-
-    @Test(timeOut = 10000)
-    public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes() throws 
Exception {
-        methodSetup(stringSupplier(() -> zks.getConnectionString()));
+    @Test(dataProvider = "zkImpls", timeOut = 10000)
+    public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes(String 
provider, Supplier<String> urlSupplier)
+            throws Exception {
+        methodSetup(urlSupplier);
 
         String missingReplica = "localhost:3181";
 
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
index 73d5f451c1f..da3fd7f7bd4 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
@@ -242,7 +242,7 @@ public class PulsarLedgerIdGeneratorTest extends 
BaseMetadataStoreTest {
         l1.await();
         log.info("res1 : {}", res1);
 
-        zks.checkContainers();
+        maybeTriggerDeletingEmptyContainers(provider);
 
         CountDownLatch l2 = new CountDownLatch(1);
         AtomicLong res2 = new AtomicLong();
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index c0159be4303..34e860aa578 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.metadata.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import lombok.Cleanup;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -31,26 +35,25 @@ import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 public class MetadataStoreFactoryImplTest {
-
-    private static Object originalProperty;
+    private static String originalMetadatastoreProvidersPropertyValue;
 
     @BeforeClass
     public void setMetadataStoreProperty() {
-        originalProperty = 
System.getProperties().get(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY);
+        originalMetadatastoreProvidersPropertyValue =
+                
System.getProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY);
         
System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY,
                 MyMetadataStoreProvider.class.getName());
     }
 
     @AfterClass
     public void resetMetadataStoreProperty() {
-        if (originalProperty != null) {
-            
System.getProperties().put(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY,
 originalProperty);
+        if (originalMetadatastoreProvidersPropertyValue != null) {
+            
System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY,
+                    originalMetadatastoreProvidersPropertyValue);
+        } else {
+            
System.clearProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY);
         }
     }
 
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java 
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index f32036e53f0..e124699ee13 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -19,34 +19,32 @@
 package org.apache.zookeeper;
 
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimaps;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiPredicate;
 import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
@@ -57,6 +55,8 @@ import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
 import org.objenesis.Objenesis;
 import org.objenesis.ObjenesisStd;
 import org.objenesis.instantiator.ObjectInstantiator;
@@ -64,33 +64,79 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MockZooKeeper extends ZooKeeper {
-    @Data
+    // ephemeralOwner value for persistent nodes
+    private static final long NOT_EPHEMERAL = 0L;
+    private static final String ROOT_PATH = "/";
+
     @AllArgsConstructor
     private static class MockZNode {
         byte[] content;
         int version;
         long ephemeralOwner;
+        long creationTimestamp;
+        long modificationTimestamp;
+        List<String> children;
 
         static MockZNode of(byte[] content, int version, long ephemeralOwner) {
-            return new MockZNode(content, version, ephemeralOwner);
+            return new MockZNode(content, version, ephemeralOwner, 
System.currentTimeMillis(),
+                    System.currentTimeMillis(), new ArrayList<>());
+        }
+
+        public void updateVersion() {
+            version++;
+            modificationTimestamp = System.currentTimeMillis();
+        }
+
+        public void updateData(byte[] data) {
+            content = data;
+            updateVersion();
+        }
+
+        public Stat getStat() {
+            return applyToStat(new Stat());
+        }
+
+        public Stat applyToStat(Stat stat) {
+            stat.setCtime(creationTimestamp);
+            stat.setMtime(modificationTimestamp);
+            stat.setVersion(version);
+            stat.setEphemeralOwner(ephemeralOwner);
+            return stat;
+        }
+
+        public int getVersion() {
+            return version;
+        }
+
+        public byte[] getContent() {
+            return content;
+        }
+
+        public long getEphemeralOwner() {
+            return ephemeralOwner;
+        }
+
+        public List<String> getChildren() {
+            return children;
         }
     }
 
     private TreeMap<String, MockZNode> tree;
-    private SetMultimap<String, Watcher> watchers;
-    private volatile boolean stopped;
+    private SetMultimap<String, NodeWatcher> watchers;
+    private AtomicBoolean stopped;
     private AtomicReference<KeeperException.Code> alwaysFail;
     private CopyOnWriteArrayList<Failure> failures;
     private ExecutorService executor;
 
-    private Watcher sessionWatcher;
-    private long sessionId = 0L;
+    private volatile Watcher sessionWatcher;
+    private long sessionId = 1L;
     private int readOpDelayMs;
 
-    private ReentrantLock mutex;
-
     private AtomicLong sequentialIdGenerator;
-    private ThreadLocal<Long> epheralOwnerThreadLocal;
+    private ThreadLocal<Long> overriddenSessionIdThreadLocal;
+    private ThreadLocal<Boolean> inExecutorThreadLocal;
+    private int referenceCount;
+    private List<AutoCloseable> closeables;
 
     //see details of Objenesis caching - http://objenesis.org/details.html
     //see supported jvms - 
https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md
@@ -110,41 +156,21 @@ public class MockZooKeeper extends ZooKeeper {
         }
     }
 
-    @Data
-    @AllArgsConstructor
-    private static class PersistentWatcher {
-        final String path;
-        final Watcher watcher;
-        final AddWatchMode mode;
+    private record PersistentWatcher(String path, Watcher watcher, 
AddWatchMode mode, long sessionId) {
     }
 
-    private List<PersistentWatcher> persistentWatchers;
-
-    public static MockZooKeeper newInstance() {
-        return newInstance(null);
+    private record NodeWatcher(Watcher watcher, long sessionId) {
     }
 
-    public static MockZooKeeper newInstance(ExecutorService executor) {
-        return newInstance(executor, -1);
-    }
-
-    public static MockZooKeeper newInstanceForGlobalZK(ExecutorService 
executor) {
-        return newInstanceForGlobalZK(executor, -1);
-    }
+    private List<PersistentWatcher> persistentWatchers;
 
-    public static MockZooKeeper newInstanceForGlobalZK(ExecutorService 
executor, int readOpDelayMs) {
-        try {
-            return createMockZooKeeperInstance(executor, readOpDelayMs);
-        } catch (RuntimeException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new IllegalStateException("Cannot create object", e);
-        }
+    public static MockZooKeeper newInstance() {
+        return newInstance(-1);
     }
 
-    public static MockZooKeeper newInstance(ExecutorService executor, int 
readOpDelayMs) {
+    public static MockZooKeeper newInstance(int readOpDelayMs) {
         try {
-            return createMockZooKeeperInstance(executor, readOpDelayMs);
+            return createMockZooKeeperInstance(readOpDelayMs);
         } catch (RuntimeException e) {
             throw e;
         } catch (Exception e) {
@@ -152,29 +178,25 @@ public class MockZooKeeper extends ZooKeeper {
         }
     }
 
-    private static MockZooKeeper createMockZooKeeperInstance(ExecutorService 
executor, int readOpDelayMs) {
+    private static MockZooKeeper createMockZooKeeperInstance(int 
readOpDelayMs) {
         ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator =
                 objenesis.getInstantiatorOf(MockZooKeeper.class);
         MockZooKeeper zk = mockZooKeeperInstantiator.newInstance();
-        zk.epheralOwnerThreadLocal = new ThreadLocal<>();
-        zk.init(executor);
+        zk.overriddenSessionIdThreadLocal = new ThreadLocal<>();
+        zk.inExecutorThreadLocal = ThreadLocal.withInitial(() -> false);
+        zk.init();
         zk.readOpDelayMs = readOpDelayMs;
-        zk.mutex = new ReentrantLock();
-        zk.lockInstance = ThreadLocal.withInitial(zk::createLock);
         zk.sequentialIdGenerator = new AtomicLong();
+        zk.closeables = new ArrayList<>();
         return zk;
     }
 
-    private void init(ExecutorService executor) {
+    private void init() {
         tree = Maps.newTreeMap();
-        if (executor != null) {
-            this.executor = executor;
-        } else {
-            this.executor = Executors.newFixedThreadPool(1, new 
DefaultThreadFactory("mock-zookeeper"));
-        }
-        SetMultimap<String, Watcher> w = HashMultimap.create();
-        watchers = Multimaps.synchronizedSetMultimap(w);
-        stopped = false;
+        tree.put(ROOT_PATH, MockZNode.of(new byte[0], 0, NOT_EPHEMERAL));
+        this.executor = Executors.newSingleThreadExecutor(new 
DefaultThreadFactory("mock-zookeeper"));
+        watchers = HashMultimap.create();
+        stopped = new AtomicBoolean(false);
         alwaysFail = new AtomicReference<>(KeeperException.Code.OK);
         failures = new CopyOnWriteArrayList<>();
         persistentWatchers = new ArrayList<>();
@@ -197,101 +219,143 @@ public class MockZooKeeper extends ZooKeeper {
         return States.CONNECTED;
     }
 
+    @Override
+    public void register(Watcher watcher) {
+        sessionWatcher = watcher;
+    }
 
-    @Slf4j
-    private static class SingleAcquireAndReleaseLock {
-        private final AtomicBoolean acquired = new AtomicBoolean(false);
-        private final Lock lock;
+    @Override
+    public String create(String path, byte[] data, List<ACL> acl, CreateMode 
createMode)
+            throws KeeperException, InterruptedException {
+        return runInExecutorReturningValue(() -> internalCreate(path, data, 
createMode));
+    }
 
-        SingleAcquireAndReleaseLock(Lock lock) {
-            this.lock = lock;
+    private <T> T runInExecutorReturningValue(Callable<T> task)
+            throws InterruptedException, KeeperException {
+        if (isStopped()) {
+            throw new KeeperException.ConnectionLossException();
         }
-
-        public void lock() {
-            if (acquired.compareAndSet(false, true)) {
-                lock.lock();
-            } else {
-                throw new IllegalStateException("Lock was already acquired!");
+        if (inExecutorThreadLocal.get()) {
+            try {
+                return task.call();
+            } catch (Exception e) {
+                if (e instanceof KeeperException ke) {
+                    throw ke;
+                }
+                if (e instanceof InterruptedException ie) {
+                    throw ie;
+                }
+                log.error("Unexpected exception", e);
+                throw new KeeperException.SystemErrorException();
             }
         }
-
-        public void unlockIfNeeded() {
-            if (acquired.compareAndSet(true, false)) {
-                lock.unlock();
+        try {
+            long currentSessionId = getSessionId();
+            return executor.submit(() -> {
+                inExecutorThreadLocal.set(true);
+                overrideSessionId(currentSessionId);
+                try {
+                    return task.call();
+                } finally {
+                    removeSessionIdOverride();
+                    inExecutorThreadLocal.set(false);
+                }
+            }).get();
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof KeeperException ke) {
+                throw ke;
+            }
+            if (cause instanceof InterruptedException ie) {
+                throw ie;
             }
+            log.error("Unexpected exception", e);
+            throw new KeeperException.SystemErrorException();
         }
     }
 
-    private ThreadLocal<SingleAcquireAndReleaseLock> lockInstance;
-
-    private SingleAcquireAndReleaseLock createLock() {
-        return new SingleAcquireAndReleaseLock(mutex);
-    }
-
-    private void lock() {
-        lockInstance.get().lock();
-    }
-
-    private void unlockIfLocked() {
-        lockInstance.get().unlockIfNeeded();
+    private void runInExecutorAsync(Runnable runnable) {
+        if (isStopped()) {
+            throw new RejectedExecutionException("MockZooKeeper is stopped");
+        }
+        if (inExecutorThreadLocal.get()) {
+            try {
+                runnable.run();
+            } catch (Throwable t) {
+                log.error("Unexpected exception", t);
+            }
+            return;
+        }
+        long currentSessionId = getSessionId();
+        executor.submit(() -> {
+            try {
+                inExecutorThreadLocal.set(true);
+                overrideSessionId(currentSessionId);
+                try {
+                    runnable.run();
+                } finally {
+                    removeSessionIdOverride();
+                    inExecutorThreadLocal.set(false);
+                }
+            } catch (Throwable t) {
+                log.error("Unexpected exception", t);
+            }
+        });
     }
 
-    @Override
-    public void register(Watcher watcher) {
-        lock();
-        sessionWatcher = watcher;
-        unlockIfLocked();
+    private void runInExecutorSync(Runnable runnable) {
+        try {
+            runInExecutorReturningValue(() -> {
+                runnable.run();
+                return null;
+            });
+        } catch (Exception e) {
+            log.error("Unexpected error", e);
+        }
     }
 
-    @Override
-    public String create(String path, byte[] data, List<ACL> acl, CreateMode 
createMode)
-            throws KeeperException, InterruptedException {
+    private String internalCreate(String path, byte[] data, CreateMode 
createMode) throws KeeperException {
         final Set<Watcher> toNotifyCreate = Sets.newHashSet();
         final Set<Watcher> toNotifyParent = Sets.newHashSet();
-        final String parent = path.substring(0, path.lastIndexOf("/"));
-
-        lock();
-        try {
-
+        final String parent = getParentName(path);
 
-            maybeThrowProgrammedFailure(Op.CREATE, path);
+        maybeThrowProgrammedFailure(Op.CREATE, path);
 
-            if (stopped) {
-                throw new KeeperException.ConnectionLossException();
-            }
-
-            if (tree.containsKey(path)) {
-                throw new KeeperException.NodeExistsException(path);
-            }
+        if (isStopped()) {
+            throw new KeeperException.ConnectionLossException();
+        }
 
-            if (!parent.isEmpty() && !tree.containsKey(parent)) {
-                throw new KeeperException.NoNodeException();
-            }
+        if (tree.containsKey(path)) {
+            throw new KeeperException.NodeExistsException(path);
+        }
 
-            if (createMode.isSequential()) {
-                MockZNode parentNode = tree.get(parent);
-                int parentVersion = tree.get(parent).getVersion();
-                path = path + parentVersion;
+        MockZNode parentNode = tree.get(parent);
 
-                // Update parent version
-                tree.put(parent,
-                        MockZNode.of(parentNode.getContent(), parentVersion + 
1, parentNode.getEphemeralOwner()));
-            }
+        if (parentNode == null) {
+            throw new KeeperException.NoNodeException(parent);
+        }
 
-            tree.put(path, MockZNode.of(data, 0, createMode.isEphemeral() ? 
getEphemeralOwner() : -1L));
+        if (createMode.isSequential()) {
+            int parentVersion = parentNode.getVersion();
+            path = path + parentVersion;
+            parentNode.updateVersion();
+        }
 
-            toNotifyCreate.addAll(watchers.get(path));
+        parentNode.getChildren().add(getNodeName(path));
+        tree.put(path, createMockZNode(data, createMode));
 
-            if (!parent.isEmpty()) {
-                toNotifyParent.addAll(watchers.get(parent));
-            }
-            watchers.removeAll(path);
-        } finally {
-            unlockIfLocked();
+        toNotifyCreate.addAll(getWatchers(path));
+        if (!ROOT_PATH.equals(parent)) {
+            toNotifyParent.addAll(getWatchers(parent));
         }
+        watchers.removeAll(path);
 
         final String finalPath = path;
         executor.execute(() -> {
+            if (isStopped()) {
+                return;
+            }
+
             triggerPersistentWatches(finalPath, parent, EventType.NodeCreated);
 
             toNotifyCreate.forEach(
@@ -309,43 +373,62 @@ public class MockZooKeeper extends ZooKeeper {
         return path;
     }
 
-    protected long getEphemeralOwner() {
-        Long epheralOwner = epheralOwnerThreadLocal.get();
-        if (epheralOwner != null) {
-            return epheralOwner;
+    private static String getParentName(String path) {
+        String parentName = path.substring(0, path.lastIndexOf('/'));
+        return parentName.length() > 0 ? parentName : "/";
+    }
+
+    private static String getNodeName(String path) {
+        return path.substring(path.lastIndexOf('/') + 1);
+    }
+
+    private Collection<Watcher> getWatchers(String path) {
+        Set<NodeWatcher> nodeWatchers = watchers.get(path);
+        if (nodeWatchers != null) {
+            return nodeWatchers.stream().map(NodeWatcher::watcher).toList();
+        } else {
+            return Collections.emptyList();
         }
-        return getSessionId();
     }
 
-    public void overrideEpheralOwner(long epheralOwner) {
-        epheralOwnerThreadLocal.set(epheralOwner);
+    @Override
+    public long getSessionId() {
+        Long overriddenSessionId = overriddenSessionIdThreadLocal.get();
+        if (overriddenSessionId != null) {
+            return overriddenSessionId;
+        }
+        return sessionId;
+    }
+
+    public void overrideSessionId(long sessionId) {
+        overriddenSessionIdThreadLocal.set(sessionId);
     }
 
-    public void removeEpheralOwnerOverride() {
-        epheralOwnerThreadLocal.remove();
+    public void removeSessionIdOverride() {
+        overriddenSessionIdThreadLocal.remove();
     }
 
     @Override
     public void create(final String path, final byte[] data, final List<ACL> 
acl, CreateMode createMode,
                        final StringCallback cb, final Object ctx) {
-
-
-        executor.execute(() -> {
+        if (isStopped()) {
+            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
path, ctx, null);
+            return;
+        }
+        runInExecutorAsync(() -> {
             try {
-                lock();
-
-                if (stopped) {
+                if (isStopped()) {
                     
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null);
                     return;
                 }
 
                 final Set<Watcher> toNotifyCreate = Sets.newHashSet();
-                toNotifyCreate.addAll(watchers.get(path));
+                toNotifyCreate.addAll(getWatchers(path));
 
                 final Set<Watcher> toNotifyParent = Sets.newHashSet();
-                final String parent = path.substring(0, path.lastIndexOf("/"));
-                if (!parent.isEmpty()) {
-                    toNotifyParent.addAll(watchers.get(parent));
+                final String parent = getParentName(path);
+                if (!ROOT_PATH.equals(parent)) {
+                    toNotifyParent.addAll(getWatchers(parent));
                 }
 
                 final String name;
@@ -357,355 +440,247 @@ public class MockZooKeeper extends ZooKeeper {
 
                 Optional<KeeperException.Code> failure = 
programmedFailure(Op.CREATE, path);
                 if (failure.isPresent()) {
-                    unlockIfLocked();
                     cb.processResult(failure.get().intValue(), path, ctx, 
null);
-                } else if (stopped) {
-                    unlockIfLocked();
+                } else if (isStopped()) {
                     
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null);
                 } else if (tree.containsKey(path)) {
-                    unlockIfLocked();
                     
cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
-                } else if (!parent.isEmpty() && !tree.containsKey(parent)) {
-                    unlockIfLocked();
-                    toNotifyParent.forEach(watcher -> watcher
-                            .process(new 
WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
-                                    parent)));
+                } else if (!tree.containsKey(parent)) {
+                    runNotifications(() -> {
+                        toNotifyParent.forEach(watcher -> watcher
+                                .process(new 
WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
+                                        parent)));
+                    });
                     cb.processResult(KeeperException.Code.NONODE.intValue(), 
path, ctx, null);
                 } else {
-                    tree.put(name, MockZNode.of(data, 0,
-                            createMode != null && createMode.isEphemeral() ? 
getEphemeralOwner() : -1L));
+                    tree.get(parent).getChildren().add(getNodeName(name));
+                    tree.put(name, createMockZNode(data, createMode));
                     watchers.removeAll(name);
-                    unlockIfLocked();
                     cb.processResult(0, path, ctx, name);
-
-                    triggerPersistentWatches(path, parent, 
EventType.NodeCreated);
-
-                    toNotifyCreate.forEach(
-                            watcher -> watcher.process(
-                                    new WatchedEvent(EventType.NodeCreated,
-                                            KeeperState.SyncConnected,
-                                            name)));
-                    toNotifyParent.forEach(
-                            watcher -> watcher.process(
-                                    new 
WatchedEvent(EventType.NodeChildrenChanged,
-                                            KeeperState.SyncConnected,
-                                            parent)));
+                    runNotifications(() -> {
+                        triggerPersistentWatches(path, parent, 
EventType.NodeCreated);
+
+                        toNotifyCreate.forEach(
+                                watcher -> watcher.process(
+                                        new WatchedEvent(EventType.NodeCreated,
+                                                KeeperState.SyncConnected,
+                                                name)));
+                        toNotifyParent.forEach(
+                                watcher -> watcher.process(
+                                        new 
WatchedEvent(EventType.NodeChildrenChanged,
+                                                KeeperState.SyncConnected,
+                                                parent)));
+                    });
                 }
             } catch (Throwable ex) {
                 log.error("create path : {} error", path, ex);
                 cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), 
path, ctx, null);
-            } finally {
-                unlockIfLocked();
             }
         });
+    }
+
+    public void runNotifications(Runnable runnable) {
+        executor.execute(() -> {
+            if (isStopped()) {
+                return;
+            }
+            runnable.run();
+        });
+    }
 
+    private boolean isStopped() {
+        return stopped.get();
+    }
+
+    private MockZNode createMockZNode(byte[] data, CreateMode createMode) {
+        return MockZNode.of(data, 0,
+                createMode != null && createMode.isEphemeral() ? 
getSessionId() : NOT_EPHEMERAL);
     }
 
     @Override
-    public byte[] getData(String path, Watcher watcher, Stat stat) throws 
KeeperException {
-        lock();
-        try {
-            maybeThrowProgrammedFailure(Op.GET, path);
-            MockZNode value = tree.get(path);
-            if (value == null) {
-                throw new KeeperException.NoNodeException(path);
-            } else {
-                if (watcher != null) {
-                    watchers.put(path, watcher);
-                }
-                if (stat != null) {
-                    applyToStat(value, stat);
-                }
-                return value.getContent();
+    public byte[] getData(String path, Watcher watcher, Stat stat) throws 
KeeperException, InterruptedException {
+        return runInExecutorReturningValue(() -> internalGetData(path, 
watcher, stat));
+    }
+
+    private byte[] internalGetData(String path, Watcher watcher, Stat stat) 
throws KeeperException {
+        maybeThrowProgrammedFailure(Op.GET, path);
+        MockZNode value = tree.get(path);
+        if (value == null) {
+            throw new KeeperException.NoNodeException(path);
+        } else {
+            if (watcher != null) {
+                watchers.put(path, new NodeWatcher(watcher, getSessionId()));
             }
-        } finally {
-            unlockIfLocked();
+            if (stat != null) {
+                value.applyToStat(stat);
+            }
+            return value.getContent();
         }
     }
 
     @Override
     public void getData(final String path, boolean watch, final DataCallback 
cb, final Object ctx) {
-        executor.execute(() -> {
-            try {
-                checkReadOpDelay();
-                Optional<KeeperException.Code> failure = 
programmedFailure(Op.GET, path);
-                if (failure.isPresent()) {
-                    cb.processResult(failure.get().intValue(), path, ctx, 
null, null);
-                    return;
-                } else if (stopped) {
-                    
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null, null);
-                    return;
-                }
-
-                MockZNode value;
-                lock();
-                try {
-                    value = tree.get(path);
-                } finally {
-                    unlockIfLocked();
-                }
-
-                if (value == null) {
-                    cb.processResult(KeeperException.Code.NONODE.intValue(), 
path, ctx, null, null);
-                } else {
-                    cb.processResult(0, path, ctx, value.getContent(), 
createStatForZNode(value));
-                }
-            } catch (Throwable ex) {
-                log.error("get data : {} error", path, ex);
-                cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), 
path, ctx, null, null);
-            }
-        });
+        getData(path, null, cb, ctx);
     }
 
     @Override
     public void getData(final String path, final Watcher watcher, final 
DataCallback cb, final Object ctx) {
-        executor.execute(() -> {
+        if (isStopped()) {
+            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
path, ctx, null, null);
+            return;
+        }
+        runInExecutorAsync(() -> {
             checkReadOpDelay();
             try {
-                lock();
                 Optional<KeeperException.Code> failure = 
programmedFailure(Op.GET, path);
                 if (failure.isPresent()) {
-                    unlockIfLocked();
                     cb.processResult(failure.get().intValue(), path, ctx, 
null, null);
                     return;
-                } else if (stopped) {
-                    unlockIfLocked();
+                } else if (isStopped()) {
                     
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null, null);
                     return;
                 }
 
                 MockZNode value = tree.get(path);
                 if (value == null) {
-                    unlockIfLocked();
                     cb.processResult(KeeperException.Code.NONODE.intValue(), 
path, ctx, null, null);
                 } else {
                     if (watcher != null) {
-                        watchers.put(path, watcher);
+                        watchers.put(path, new NodeWatcher(watcher, 
getSessionId()));
                     }
-
-                    Stat stat = createStatForZNode(value);
-                    unlockIfLocked();
+                    Stat stat = value.getStat();
                     cb.processResult(0, path, ctx, value.getContent(), stat);
                 }
             } catch (Throwable ex) {
                 log.error("get data : {} error", path, ex);
                 cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), 
path, ctx, null, null);
-            } finally {
-                unlockIfLocked();
             }
         });
     }
 
     @Override
     public void getChildren(final String path, final Watcher watcher, final 
ChildrenCallback cb, final Object ctx) {
-        executor.execute(() -> {
-            List<String> children = Lists.newArrayList();
+        if (isStopped()) {
+            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
path, ctx, null);
+            return;
+        }
+        runInExecutorAsync(() -> {
             try {
-                lock();
                 Optional<KeeperException.Code> failure = 
programmedFailure(Op.GET_CHILDREN, path);
                 if (failure.isPresent()) {
-                    unlockIfLocked();
                     cb.processResult(failure.get().intValue(), path, ctx, 
null);
                     return;
-                } else if (stopped) {
-                    unlockIfLocked();
+                } else if (isStopped()) {
                     
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null);
                     return;
                 }
 
                 if (!tree.containsKey(path)) {
-                    unlockIfLocked();
                     cb.processResult(KeeperException.Code.NONODE.intValue(), 
path, ctx, null);
                     return;
                 }
 
-                for (String item : tree.tailMap(path).keySet()) {
-                    if (!item.startsWith(path)) {
-                        break;
-                    } else {
-                        if (path.length() >= item.length()) {
-                            continue;
-                        }
-
-                        String child = item.substring(path.length() + 1);
-                        if (item.charAt(path.length()) == '/' && 
!child.contains("/")) {
-                            children.add(child);
-                        }
-                    }
-                }
-
+                List<String> children = findFirstLevelChildren(path);
                 if (watcher != null) {
-                    watchers.put(path, watcher);
+                    watchers.put(path, new NodeWatcher(watcher, 
getSessionId()));
                 }
                 cb.processResult(0, path, ctx, children);
             } catch (Throwable ex) {
                 log.error("get children : {} error", path, ex);
                 cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), 
path, ctx, null);
-            } finally {
-                unlockIfLocked();
             }
-
         });
     }
 
     @Override
-    public List<String> getChildren(String path, Watcher watcher) throws 
KeeperException {
-        lock();
-        try {
-            maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
-
-            if (!tree.containsKey(path)) {
-                throw new KeeperException.NoNodeException();
-            }
-
-            String firstKey = path.equals("/") ? path : path + "/";
-            String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is 
lexicographically just after '/'
+    public List<String> getChildren(String path, Watcher watcher) throws 
KeeperException, InterruptedException {
+        return runInExecutorReturningValue(() -> internalGetChildren(path, 
watcher));
+    }
 
-            Set<String> children = new TreeSet<>();
-            tree.subMap(firstKey, false, lastKey, false).forEach((key, value) 
-> {
-                String relativePath = key.replace(firstKey, "");
+    private List<String> internalGetChildren(String path, Watcher watcher) 
throws KeeperException {
+        maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
 
-                // Only return first-level children
-                String child = relativePath.split("/", 2)[0];
-                children.add(child);
-            });
-
-            if (watcher != null) {
-                watchers.put(path, watcher);
-            }
+        if (!tree.containsKey(path)) {
+            throw new KeeperException.NoNodeException(path);
+        }
 
-            return new ArrayList<>(children);
-        } finally {
-            unlockIfLocked();
+        if (watcher != null) {
+            watchers.put(path, new NodeWatcher(watcher, getSessionId()));
         }
+
+        return findFirstLevelChildren(path);
     }
 
     @Override
     public List<String> getChildren(String path, boolean watch) throws 
KeeperException, InterruptedException {
-        lock();
-        try {
-            maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
-
-            if (stopped) {
-                throw new KeeperException.ConnectionLossException();
-            } else if (!tree.containsKey(path)) {
-                throw new KeeperException.NoNodeException();
-            }
-
-            String firstKey = path.equals("/") ? path : path + "/";
-            String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is 
lexicographically just after '/'
-
-            Set<String> children = new TreeSet<>();
-            tree.subMap(firstKey, false, lastKey, false).forEach((key, value) 
-> {
-                String relativePath = key.replace(firstKey, "");
-
-                // Only return first-level children
-                String child = relativePath.split("/", 2)[0];
-                children.add(child);
-            });
-
-            return new ArrayList<>(children);
-        } finally {
-            unlockIfLocked();
-        }
+        return getChildren(path, null);
     }
 
     @Override
     public void getChildren(final String path, boolean watcher, final 
Children2Callback cb, final Object ctx) {
-        executor.execute(() -> {
-            Set<String> children = new TreeSet<>();
+        if (isStopped()) {
+            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
path, ctx, null, null);
+            return;
+        }
+        runInExecutorAsync(() -> {
             try {
-                lock();
+                MockZNode mockZNode = tree.get(path);
+                Stat stat = mockZNode != null ? mockZNode.getStat() : null;
                 Optional<KeeperException.Code> failure = 
programmedFailure(Op.GET_CHILDREN, path);
                 if (failure.isPresent()) {
-                    unlockIfLocked();
                     cb.processResult(failure.get().intValue(), path, ctx, 
null, null);
                     return;
-                } else if (stopped) {
-                    unlockIfLocked();
+                } else if (isStopped()) {
                     
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null, null);
                     return;
-                } else if (!tree.containsKey(path)) {
-                    unlockIfLocked();
+                } else if (mockZNode == null) {
                     cb.processResult(KeeperException.Code.NONODE.intValue(), 
path, ctx, null, null);
                     return;
                 }
 
-                String firstKey = path.equals("/") ? path : path + "/";
-                String lastKey = path.equals("/") ? "0" : path + "0"; // '0' 
is lexicographically just after '/'
-
-                tree.subMap(firstKey, false, lastKey, false).forEach((key, 
value) -> {
-                    String relativePath = key.replace(firstKey, "");
-
-                    // Only return first-level children
-                    String child = relativePath.split("/", 2)[0];
-                    children.add(child);
-                });
-                cb.processResult(0, path, ctx, new ArrayList<>(children), new 
Stat());
+                List<String> children = findFirstLevelChildren(path);
+                cb.processResult(0, path, ctx, children, stat);
             } catch (Throwable ex) {
                 log.error("get children : {} error", path, ex);
                 cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), 
path, ctx, null, null);
-            } finally {
-                unlockIfLocked();
             }
         });
+    }
 
+    private List<String> findFirstLevelChildren(String path) {
+        return new ArrayList<>(tree.get(path).getChildren());
+    }
+
+    private boolean hasChildren(String path) {
+        return !tree.get(path).getChildren().isEmpty();
     }
 
     @Override
     public Stat exists(String path, boolean watch) throws KeeperException, 
InterruptedException {
-        lock();
-        try {
-            maybeThrowProgrammedFailure(Op.EXISTS, path);
+        return runInExecutorReturningValue(() -> internalGetStat(path, null));
+    }
 
-            if (stopped) {
-                throw new KeeperException.ConnectionLossException();
-            }
+    private Stat internalGetStat(String path, Watcher watcher) throws 
KeeperException {
+        maybeThrowProgrammedFailure(Op.EXISTS, path);
 
-            if (tree.containsKey(path)) {
-                return createStatForZNode(tree.get(path));
-            } else {
-                return null;
-            }
-        } finally {
-            unlockIfLocked();
+        if (isStopped()) {
+            throw new KeeperException.ConnectionLossException();
         }
-    }
 
-    private static Stat createStatForZNode(MockZNode zNode) {
-        return applyToStat(zNode, new Stat());
-    }
+        if (watcher != null) {
+            watchers.put(path, new NodeWatcher(watcher, getSessionId()));
+        }
 
-    private static Stat applyToStat(MockZNode zNode, Stat stat) {
-        stat.setVersion(zNode.getVersion());
-        if (zNode.getEphemeralOwner() != -1L) {
-            stat.setEphemeralOwner(zNode.getEphemeralOwner());
+        if (tree.containsKey(path)) {
+            return tree.get(path).getStat();
+        } else {
+            return null;
         }
-        return stat;
     }
 
     @Override
     public Stat exists(String path, Watcher watcher) throws KeeperException, 
InterruptedException {
-        lock();
-        try {
-            maybeThrowProgrammedFailure(Op.EXISTS, path);
-
-            if (stopped) {
-                throw new KeeperException.ConnectionLossException();
-            }
-
-            if (watcher != null) {
-                watchers.put(path, watcher);
-            }
-
-            if (tree.containsKey(path)) {
-                return createStatForZNode(tree.get(path));
-            } else {
-                return null;
-            }
-        } finally {
-            unlockIfLocked();
-        }
+        return runInExecutorReturningValue(() -> internalGetStat(path, 
watcher));
     }
 
     @Override
@@ -715,160 +690,149 @@ public class MockZooKeeper extends ZooKeeper {
 
     @Override
     public void exists(String path, Watcher watcher, StatCallback cb, Object 
ctx) {
-        executor.execute(() -> {
+        if (isStopped()) {
+            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
path, ctx, null);
+            return;
+        }
+        runInExecutorAsync(() -> {
             try {
-                lock();
                 Optional<KeeperException.Code> failure = 
programmedFailure(Op.EXISTS, path);
                 if (failure.isPresent()) {
-                    unlockIfLocked();
                     cb.processResult(failure.get().intValue(), path, ctx, 
null);
                     return;
-                } else if (stopped) {
-                    unlockIfLocked();
+                } else if (isStopped()) {
                     
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null);
                     return;
                 }
 
                 if (watcher != null) {
-                    watchers.put(path, watcher);
+                    watchers.put(path, new NodeWatcher(watcher, 
getSessionId()));
                 }
 
-                if (tree.containsKey(path)) {
-                    unlockIfLocked();
-                    cb.processResult(0, path, ctx, new Stat());
+                MockZNode mockZNode = tree.get(path);
+                if (mockZNode != null) {
+                    Stat stat = mockZNode.getStat();
+                    cb.processResult(0, path, ctx, stat);
                 } else {
-                    unlockIfLocked();
                     cb.processResult(KeeperException.Code.NONODE.intValue(), 
path, ctx, null);
                 }
             } catch (Throwable ex) {
                 log.error("exist : {} error", path, ex);
                 cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), 
path, ctx, null);
-            } finally {
-                unlockIfLocked();
             }
         });
     }
 
     @Override
     public void sync(String path, VoidCallback cb, Object ctx) {
-        executor.execute(() -> {
+        if (isStopped()) {
+            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
path, ctx);
+            return;
+        }
+        runInExecutorAsync(() -> {
             Optional<KeeperException.Code> failure = 
programmedFailure(Op.SYNC, path);
             if (failure.isPresent()) {
                 cb.processResult(failure.get().intValue(), path, ctx);
                 return;
-            } else if (stopped) {
+            } else if (isStopped()) {
                 
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
                 return;
             }
-
             cb.processResult(0, path, ctx);
         });
-
     }
 
     @Override
     public Stat setData(final String path, byte[] data, int version) throws 
KeeperException, InterruptedException {
-        final Set<Watcher> toNotify = Sets.newHashSet();
-        MockZNode newZNode;
-
-        lock();
-        try {
-            maybeThrowProgrammedFailure(Op.SET, path);
-
-            if (stopped) {
-                throw new KeeperException.ConnectionLossException();
-            }
+        return runInExecutorReturningValue(() ->  internalSetData(path, data, 
version));
+    }
 
-            if (!tree.containsKey(path)) {
-                throw new KeeperException.NoNodeException();
-            }
+    private Stat internalSetData(String path, byte[] data, int version) throws 
KeeperException {
+        final Set<Watcher> toNotify = Sets.newHashSet();
+        maybeThrowProgrammedFailure(Op.SET, path);
 
-            MockZNode mockZNode = tree.get(path);
-            int currentVersion = mockZNode.getVersion();
+        if (isStopped()) {
+            throw new KeeperException.ConnectionLossException();
+        }
 
-            // Check version
-            if (version != -1 && version != currentVersion) {
-                throw new KeeperException.BadVersionException(path);
-            }
+        if (!tree.containsKey(path)) {
+            throw new KeeperException.NoNodeException(path);
+        }
 
-            log.debug("[{}] Updating -- current version: {}", path, 
currentVersion);
-            newZNode = MockZNode.of(data, currentVersion + 1, 
mockZNode.getEphemeralOwner());
-            tree.put(path, newZNode);
+        MockZNode mockZNode = tree.get(path);
+        int currentVersion = mockZNode.getVersion();
 
-            toNotify.addAll(watchers.get(path));
-            watchers.removeAll(path);
-        } finally {
-            unlockIfLocked();
+        // Check version
+        if (version != -1 && version != currentVersion) {
+            throw new KeeperException.BadVersionException(path);
         }
 
-        executor.execute(() -> {
+        log.debug("[{}] Updating -- current version: {}", path, 
currentVersion);
+        mockZNode.updateData(data);
+        Stat stat = mockZNode.getStat();
+        toNotify.addAll(getWatchers(path));
+        watchers.removeAll(path);
+
+        runNotifications(() -> {
             triggerPersistentWatches(path, null, EventType.NodeDataChanged);
 
             toNotify.forEach(watcher -> watcher
                     .process(new WatchedEvent(EventType.NodeDataChanged, 
KeeperState.SyncConnected, path)));
         });
 
-        return createStatForZNode(newZNode);
+        return stat;
     }
 
     @Override
     public void setData(final String path, final byte[] data, int version, 
final StatCallback cb, final Object ctx) {
-        if (stopped) {
+        if (isStopped()) {
             cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
path, ctx, null);
             return;
         }
-
-        executor.execute(() -> {
+        runInExecutorAsync(() -> {
             try {
                 final Set<Watcher> toNotify = Sets.newHashSet();
                 Stat stat;
-                lock();
-                try {
-                    Optional<KeeperException.Code> failure = 
programmedFailure(Op.SET, path);
-                    if (failure.isPresent()) {
-                        unlockIfLocked();
-                        cb.processResult(failure.get().intValue(), path, ctx, 
null);
-                        return;
-                    } else if (stopped) {
-                        unlockIfLocked();
-                        
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null);
-                        return;
-                    }
-
-                    if (!tree.containsKey(path)) {
-                        unlockIfLocked();
-                        
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
-                        return;
-                    }
+                Optional<KeeperException.Code> failure = 
programmedFailure(Op.SET, path);
+                if (failure.isPresent()) {
+                    cb.processResult(failure.get().intValue(), path, ctx, 
null);
+                    return;
+                } else if (isStopped()) {
+                    
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, 
null);
+                    return;
+                }
 
-                    MockZNode mockZNode = tree.get(path);
-                    int currentVersion = mockZNode.getVersion();
+                if (!tree.containsKey(path)) {
+                    cb.processResult(KeeperException.Code.NONODE.intValue(), 
path, ctx, null);
+                    return;
+                }
 
-                    // Check version
-                    if (version != -1 && version != currentVersion) {
-                        log.debug("[{}] Current version: {} -- Expected: {}", 
path, currentVersion, version);
-                        unlockIfLocked();
-                        
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
-                        return;
-                    }
+                MockZNode mockZNode = tree.get(path);
+                int currentVersion = mockZNode.getVersion();
 
-                    log.debug("[{}] Updating -- current version: {}", path, 
currentVersion);
-                    MockZNode newZNode = MockZNode.of(data, currentVersion + 
1, mockZNode.getEphemeralOwner());
-                    tree.put(path, newZNode);
-                    stat = createStatForZNode(newZNode);
-                } finally {
-                    unlockIfLocked();
+                // Check version
+                if (version != -1 && version != currentVersion) {
+                    log.debug("[{}] Current version: {} -- Expected: {}", 
path, currentVersion, version);
+                    Stat currentStat = mockZNode.getStat();
+                    
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, 
currentStat);
+                    return;
                 }
+
+                log.debug("[{}] Updating -- current version: {}", path, 
currentVersion);
+                mockZNode.updateData(data);
+                stat = mockZNode.getStat();
                 cb.processResult(0, path, ctx, stat);
 
-                toNotify.addAll(watchers.get(path));
+                toNotify.addAll(getWatchers(path));
                 watchers.removeAll(path);
 
-                for (Watcher watcher : toNotify) {
-                    watcher.process(new 
WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
-                }
+                runNotifications(() -> {
+                    triggerPersistentWatches(path, null, 
EventType.NodeDataChanged);
 
-                triggerPersistentWatches(path, null, 
EventType.NodeDataChanged);
+                    for (Watcher watcher : toNotify) {
+                        watcher.process(new 
WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
+                    }
+                });
             } catch (Throwable ex) {
                 log.error("Update data : {} error", path, ex);
                 cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), 
path, ctx, null);
@@ -878,50 +842,49 @@ public class MockZooKeeper extends ZooKeeper {
 
     @Override
     public void delete(final String path, int version) throws 
InterruptedException, KeeperException {
+        runInExecutorReturningValue(() -> {
+            internalDelete(path, version);
+            return null;
+        });
+    }
+
+    private void internalDelete(String path, int version) throws 
KeeperException {
         maybeThrowProgrammedFailure(Op.DELETE, path);
 
         final Set<Watcher> toNotifyDelete;
         final Set<Watcher> toNotifyParent;
         final String parent;
 
-        lock();
-        try {
-            if (stopped) {
-                throw new KeeperException.ConnectionLossException();
-            } else if (!tree.containsKey(path)) {
-                throw new KeeperException.NoNodeException(path);
-            } else if (hasChildren(path)) {
-                throw new KeeperException.NotEmptyException(path);
-            }
+        if (isStopped()) {
+            throw new KeeperException.ConnectionLossException();
+        } else if (!tree.containsKey(path)) {
+            throw new KeeperException.NoNodeException(path);
+        } else if (hasChildren(path)) {
+            throw new KeeperException.NotEmptyException(path);
+        }
 
-            if (version != -1) {
-                int currentVersion = tree.get(path).getVersion();
-                if (version != currentVersion) {
-                    throw new KeeperException.BadVersionException(path);
-                }
+        if (version != -1) {
+            int currentVersion = tree.get(path).getVersion();
+            if (version != currentVersion) {
+                throw new KeeperException.BadVersionException(path);
             }
+        }
 
-            tree.remove(path);
-
-            toNotifyDelete = Sets.newHashSet();
-            toNotifyDelete.addAll(watchers.get(path));
+        parent = getParentName(path);
+        tree.remove(path);
+        tree.get(parent).getChildren().remove(getNodeName(path));
 
-            toNotifyParent = Sets.newHashSet();
-            parent = path.substring(0, path.lastIndexOf("/"));
-            if (!parent.isEmpty()) {
-                toNotifyParent.addAll(watchers.get(parent));
-            }
+        toNotifyDelete = Sets.newHashSet();
+        toNotifyDelete.addAll(getWatchers(path));
 
-            watchers.removeAll(path);
-        } finally {
-            unlockIfLocked();
+        toNotifyParent = Sets.newHashSet();
+        if (!ROOT_PATH.equals(parent)) {
+            toNotifyParent.addAll(getWatchers(parent));
         }
 
-        executor.execute(() -> {
-            if (stopped) {
-                return;
-            }
+        watchers.removeAll(path);
 
+        runNotifications(() -> {
             for (Watcher watcher1 : toNotifyDelete) {
                 watcher1.process(new WatchedEvent(EventType.NodeDeleted, 
KeeperState.SyncConnected, path));
             }
@@ -935,179 +898,209 @@ public class MockZooKeeper extends ZooKeeper {
 
     @Override
     public void delete(final String path, int version, final VoidCallback cb, 
final Object ctx) {
-        Runnable r = () -> {
+        if (isStopped()) {
+            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
path, ctx);
+            return;
+        }
+        runInExecutorAsync(() -> {
             try {
-                lock();
                 final Set<Watcher> toNotifyDelete = Sets.newHashSet();
-                toNotifyDelete.addAll(watchers.get(path));
+                toNotifyDelete.addAll(getWatchers(path));
 
                 final Set<Watcher> toNotifyParent = Sets.newHashSet();
-                final String parent = path.substring(0, path.lastIndexOf("/"));
-                if (!parent.isEmpty()) {
-                    toNotifyParent.addAll(watchers.get(parent));
+                final String parent = getParentName(path);
+                if (!ROOT_PATH.equals(parent)) {
+                    toNotifyParent.addAll(getWatchers(parent));
                 }
                 watchers.removeAll(path);
 
                 Optional<KeeperException.Code> failure = 
programmedFailure(Op.DELETE, path);
                 if (failure.isPresent()) {
-                    unlockIfLocked();
                     cb.processResult(failure.get().intValue(), path, ctx);
-                } else if (stopped) {
-                    unlockIfLocked();
+                } else if (isStopped()) {
                     
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
                 } else if (!tree.containsKey(path)) {
-                    unlockIfLocked();
                     cb.processResult(KeeperException.Code.NONODE.intValue(), 
path, ctx);
                 } else if (hasChildren(path)) {
-                    unlockIfLocked();
                     cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), 
path, ctx);
                 } else {
                     if (version != -1) {
                         int currentVersion = tree.get(path).getVersion();
                         if (version != currentVersion) {
-                            unlockIfLocked();
                             
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx);
                             return;
                         }
                     }
 
                     tree.remove(path);
-
-                    unlockIfLocked();
+                    tree.get(parent).getChildren().remove(getNodeName(path));
                     cb.processResult(0, path, ctx);
 
-                    toNotifyDelete.forEach(watcher -> watcher
-                            .process(new WatchedEvent(EventType.NodeDeleted, 
KeeperState.SyncConnected, path)));
-                    toNotifyParent.forEach(watcher -> watcher
-                            .process(new 
WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
-                                    parent)));
-                    triggerPersistentWatches(path, parent, 
EventType.NodeDeleted);
+                    runNotifications(() -> {
+                        triggerPersistentWatches(path, parent, 
EventType.NodeDeleted);
+                        toNotifyDelete.forEach(watcher -> watcher
+                                .process(new 
WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)));
+                        toNotifyParent.forEach(watcher -> watcher
+                                .process(new 
WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
+                                        parent)));
+                    });
                 }
             } catch (Throwable ex) {
                 log.error("delete path : {} error", path, ex);
                 cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), 
path, ctx);
-            } finally {
-                unlockIfLocked();
             }
-        };
-
-        try {
-            executor.execute(r);
-        } catch (RejectedExecutionException ree) {
-            cb.processResult(KeeperException.Code.SESSIONEXPIRED.intValue(), 
path, ctx);
-        }
-
+        });
     }
 
     @Override
     public void multi(Iterable<org.apache.zookeeper.Op> ops, 
AsyncCallback.MultiCallback cb, Object ctx) {
-        try {
-            List<OpResult> res = multi(ops);
-            cb.processResult(KeeperException.Code.OK.intValue(), null, ctx, 
res);
-        } catch (Exception e) {
-            cb.processResult(KeeperException.Code.APIERROR.intValue(), null, 
ctx, null);
+        if (isStopped()) {
+            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
null, ctx, null);
+            return;
         }
+        runInExecutorAsync(() -> {
+            try {
+                List<OpResult> res = multi(ops);
+                cb.processResult(KeeperException.Code.OK.intValue(), null, 
ctx, res);
+            } catch (Exception e) {
+                cb.processResult(KeeperException.Code.APIERROR.intValue(), 
null, ctx, null);
+            }
+        });
     }
 
     @Override
     public List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws 
InterruptedException, KeeperException {
+        return runInExecutorReturningValue(() -> internalMulti(ops));
+    }
+
+    private List<OpResult> internalMulti(Iterable<org.apache.zookeeper.Op> 
ops) {
         List<OpResult> res = new ArrayList<>();
-        try {
-            for (org.apache.zookeeper.Op op : ops) {
-                switch (op.getType()) {
-                    case ZooDefs.OpCode.create -> {
+        for (org.apache.zookeeper.Op op : ops) {
+            switch (op.getType()) {
+                case ZooDefs.OpCode.create -> {
+                    handleOperation("create", op, () -> {
                         org.apache.zookeeper.Op.Create opc = 
((org.apache.zookeeper.Op.Create) op);
                         CreateMode cm = CreateMode.fromFlag(opc.flags);
-                        String path = this.create(op.getPath(), opc.data, 
null, cm);
+                        String path = create(op.getPath(), opc.data, null, cm);
                         res.add(new OpResult.CreateResult(path));
-                    }
-                    case ZooDefs.OpCode.delete -> {
-                        this.delete(op.getPath(), (int) 
FieldUtils.readField(op, "version", true));
+                    }, res);
+                }
+                case ZooDefs.OpCode.delete -> {
+                    handleOperation("delete", op, () -> {
+                        DeleteRequest deleteRequest = (DeleteRequest) 
op.toRequestRecord();
+                        delete(op.getPath(), deleteRequest.getVersion());
                         res.add(new OpResult.DeleteResult());
-                    }
-                    case ZooDefs.OpCode.setData -> {
-                        Stat stat = this.setData(
-                                op.getPath(),
-                                (byte[]) FieldUtils.readField(op, "data", 
true),
-                                (int) FieldUtils.readField(op, "version", 
true));
+                    }, res);
+                }
+                case ZooDefs.OpCode.setData -> {
+                    handleOperation("setData", op, () -> {
+                        SetDataRequest setDataRequest = (SetDataRequest) 
op.toRequestRecord();
+                        Stat stat = setData(op.getPath(), 
setDataRequest.getData(), setDataRequest.getVersion());
                         res.add(new OpResult.SetDataResult(stat));
-                    }
-                    case ZooDefs.OpCode.getChildren -> {
-                        try {
-                            List<String> children = 
this.getChildren(op.getPath(), null);
-                            res.add(new OpResult.GetChildrenResult(children));
-                        } catch (KeeperException e) {
-                            res.add(new 
OpResult.ErrorResult(e.code().intValue()));
-                        }
-                    }
-                    case ZooDefs.OpCode.getData -> {
-                        Stat stat = new Stat();
-                        try {
-                            byte[] payload = this.getData(op.getPath(), null, 
stat);
-                            res.add(new OpResult.GetDataResult(payload, stat));
-                        } catch (KeeperException e) {
-                            res.add(new 
OpResult.ErrorResult(e.code().intValue()));
-                        }
-                    }
+                    }, res);
+                }
+                case ZooDefs.OpCode.getChildren -> {
+                    handleOperation("getChildren", op, () -> {
+                        List<String> children = getChildren(op.getPath(), 
null);
+                        res.add(new OpResult.GetChildrenResult(children));
+                    }, res);
+                }
+                case ZooDefs.OpCode.getData -> {
+                    Stat stat = new Stat();
+                    handleOperation("getData", op, () -> {
+                        byte[] payload = getData(op.getPath(), null, stat);
+                        res.add(new OpResult.GetDataResult(payload, stat));
+                    }, res);
+                }
+                default -> {
+                    log.error("Unsupported operation for path {} type {} kind 
{} request {}", op.getPath(),
+                            op.getType(), op.getKind(), op.toRequestRecord());
+                    res.add(new 
OpResult.ErrorResult(KeeperException.Code.APIERROR.intValue()));
                 }
             }
-        } catch (KeeperException e) {
-            res.add(new OpResult.ErrorResult(e.code().intValue()));
-            int total = Iterables.size(ops);
-            for (int i = res.size(); i < total; i++) {
+        }
+        return res;
+    }
+
+    interface ZkOpHandler {
+        void handle() throws KeeperException, InterruptedException;
+    }
+
+    private void handleOperation(String opName, org.apache.zookeeper.Op op, 
ZkOpHandler handler, List<OpResult> res) {
+        try {
+            handler.handle();
+        } catch (Exception e) {
+            if (e instanceof KeeperException keeperException) {
+                res.add(new 
OpResult.ErrorResult(keeperException.code().intValue()));
+            } else {
+                log.error("Error handling {} operation for path {} type {} 
kind {} request {}", opName, op.getPath(),
+                        op.getType(), op.getKind(), op.toRequestRecord(), e);
                 res.add(new 
OpResult.ErrorResult(KeeperException.Code.RUNTIMEINCONSISTENCY.intValue()));
             }
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException(e);
         }
-        return res;
     }
 
     @Override
-    public synchronized void addWatch(String basePath, Watcher watcher, 
AddWatchMode mode) {
-        persistentWatchers.add(new PersistentWatcher(basePath, watcher, mode));
+    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) {
+        runInExecutorSync(() -> {
+            persistentWatchers.add(new PersistentWatcher(basePath, watcher, 
mode, getSessionId()));
+        });
     }
 
     @Override
     public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, 
VoidCallback cb, Object ctx) {
-        if (stopped) {
+        if (isStopped()) {
             cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), 
basePath, ctx);
             return;
         }
-
-        executor.execute(() -> {
-            synchronized (MockZooKeeper.this) {
-                persistentWatchers.add(new PersistentWatcher(basePath, 
watcher, mode));
-            }
-
+        runInExecutorAsync(() -> {
+            addWatch(basePath, watcher, mode);
             cb.processResult(KeeperException.Code.OK.intValue(), basePath, 
ctx);
         });
     }
 
+    public synchronized void increaseRefCount() {
+        referenceCount++;
+    }
+
+    public synchronized MockZooKeeper registerCloseable(AutoCloseable 
closeable) {
+        closeables.add(closeable);
+        return this;
+    }
+
     @Override
-    public void close() throws InterruptedException {
-        shutdown();
+    public synchronized void close() throws InterruptedException {
+        if (--referenceCount <= 0) {
+            shutdown();
+            closeables.forEach(c -> {
+                try {
+                    c.close();
+                } catch (Exception e) {
+                    log.error("Error closing closeable", e);
+                }
+            });
+            closeables.clear();
+        }
     }
 
     public void shutdown() throws InterruptedException {
-        lock();
-        try {
-            stopped = true;
-            tree.clear();
-            watchers.clear();
+        if (stopped.compareAndSet(false, true)) {
+            Future<?> shutdownTask = executor.submit(() -> {
+                tree.clear();
+                watchers.clear();
+                persistentWatchers.clear();
+            });
             try {
-                executor.shutdownNow();
-                executor.awaitTermination(5, TimeUnit.SECONDS);
-            } catch (InterruptedException ex) {
-                log.error("MockZooKeeper shutdown had error", ex);
+                shutdownTask.get();
+            } catch (ExecutionException e) {
+                log.error("Error shutting down", e);
             }
-        } finally {
-            unlockIfLocked();
+            MoreExecutors.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
         }
     }
 
     Optional<KeeperException.Code> programmedFailure(Op op, String path) {
-        KeeperException.Code error = this.alwaysFail.get();
+        KeeperException.Code error = alwaysFail.get();
         if (error != KeeperException.Code.OK) {
             return Optional.of(error);
         }
@@ -1144,26 +1137,17 @@ public class MockZooKeeper extends ZooKeeper {
     }
 
     public void setAlwaysFail(KeeperException.Code rc) {
-        this.alwaysFail.set(rc);
+        alwaysFail.set(rc);
     }
 
     public void unsetAlwaysFail() {
-        this.alwaysFail.set(KeeperException.Code.OK);
+        alwaysFail.set(KeeperException.Code.OK);
     }
 
     public void setSessionId(long id) {
         sessionId = id;
     }
 
-    @Override
-    public long getSessionId() {
-        return sessionId;
-    }
-
-    private boolean hasChildren(String path) {
-        return !tree.subMap(path + '/', path + '0').isEmpty();
-    }
-
     @Override
     public String toString() {
         return "MockZookeeper";
@@ -1182,11 +1166,11 @@ public class MockZooKeeper extends ZooKeeper {
     private void triggerPersistentWatches(String path, String parent, 
EventType eventType) {
         persistentWatchers.forEach(w -> {
             if (w.mode == AddWatchMode.PERSISTENT_RECURSIVE) {
-                if (path.startsWith(w.getPath())) {
+                if (path.startsWith(w.path())) {
                     w.watcher.process(new WatchedEvent(eventType, 
KeeperState.SyncConnected, path));
                 }
             } else if (w.mode == AddWatchMode.PERSISTENT) {
-                if (w.getPath().equals(path)) {
+                if (w.path().equals(path)) {
                     w.watcher.process(new WatchedEvent(eventType, 
KeeperState.SyncConnected, path));
                 }
 
@@ -1199,5 +1183,26 @@ public class MockZooKeeper extends ZooKeeper {
         });
     }
 
+    public void deleteEphemeralNodes(long sessionId) {
+        if (sessionId != NOT_EPHEMERAL) {
+            runInExecutorSync(() -> {
+                tree.values().removeIf(zNode -> zNode.getEphemeralOwner() == 
sessionId);
+            });
+        }
+    }
+
+
+    public void deleteWatchers(long sessionId) {
+        runInExecutorSync(() -> {
+            // remove all persistent watchers for the session
+            persistentWatchers.removeIf(w -> w.sessionId == sessionId);
+            // remove all watchers for the session
+            List<Map.Entry<String, NodeWatcher>> watchersForSession =
+                    watchers.entries().stream().filter(e -> 
e.getValue().sessionId == sessionId).toList();
+            watchersForSession
+                    .forEach(e -> watchers.remove(e.getKey(), e.getValue()));
+        });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockZooKeeper.class);
 }
diff --git 
a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java 
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
index a286a75aa91..c812423b728 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
@@ -40,7 +40,7 @@ public class MockZooKeeperSession extends ZooKeeper {
 
     private MockZooKeeper mockZooKeeper;
 
-    private long sessionId = 0L;
+    private long sessionId = 1L;
 
     private static final Objenesis objenesis = new ObjenesisStd();
 
@@ -59,6 +59,9 @@ public class MockZooKeeperSession extends ZooKeeper {
         mockZooKeeperSession.mockZooKeeper = mockZooKeeper;
         mockZooKeeperSession.sessionId = sessionIdGenerator.getAndIncrement();
         mockZooKeeperSession.closeMockZooKeeperOnClose = 
closeMockZooKeeperOnClose;
+        if (closeMockZooKeeperOnClose) {
+            mockZooKeeper.increaseRefCount();
+        }
         return mockZooKeeperSession;
     }
 
@@ -81,17 +84,22 @@ public class MockZooKeeperSession extends ZooKeeper {
 
     @Override
     public void register(Watcher watcher) {
-        mockZooKeeper.register(watcher);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.register(watcher);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public String create(String path, byte[] data, List<ACL> acl, CreateMode 
createMode)
             throws KeeperException, InterruptedException {
         try {
-            mockZooKeeper.overrideEpheralOwner(getSessionId());
+            mockZooKeeper.overrideSessionId(getSessionId());
             return mockZooKeeper.create(path, data, acl, createMode);
         } finally {
-            mockZooKeeper.removeEpheralOwnerOverride();
+            mockZooKeeper.removeSessionIdOverride();
         }
     }
 
@@ -99,134 +107,257 @@ public class MockZooKeeperSession extends ZooKeeper {
     public void create(final String path, final byte[] data, final List<ACL> 
acl, CreateMode createMode,
                        final AsyncCallback.StringCallback cb, final Object 
ctx) {
         try {
-            mockZooKeeper.overrideEpheralOwner(getSessionId());
+            mockZooKeeper.overrideSessionId(getSessionId());
             mockZooKeeper.create(path, data, acl, createMode, cb, ctx);
         } finally {
-            mockZooKeeper.removeEpheralOwnerOverride();
+            mockZooKeeper.removeSessionIdOverride();
         }
     }
 
     @Override
-    public byte[] getData(String path, Watcher watcher, Stat stat) throws 
KeeperException {
-        return mockZooKeeper.getData(path, watcher, stat);
+    public byte[] getData(String path, Watcher watcher, Stat stat) throws 
KeeperException, InterruptedException {
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            return mockZooKeeper.getData(path, watcher, stat);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void getData(final String path, boolean watch, final DataCallback 
cb, final Object ctx) {
-        mockZooKeeper.getData(path, watch, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.getData(path, watch, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void getData(final String path, final Watcher watcher, final 
DataCallback cb, final Object ctx) {
-        mockZooKeeper.getData(path, watcher, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.getData(path, watcher, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void getChildren(final String path, final Watcher watcher, final 
ChildrenCallback cb, final Object ctx) {
-        mockZooKeeper.getChildren(path, watcher, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.getChildren(path, watcher, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
-    public List<String> getChildren(String path, Watcher watcher) throws 
KeeperException {
-        return mockZooKeeper.getChildren(path, watcher);
+    public List<String> getChildren(String path, Watcher watcher) throws 
KeeperException, InterruptedException {
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            return mockZooKeeper.getChildren(path, watcher);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public List<String> getChildren(String path, boolean watch) throws 
KeeperException, InterruptedException {
-        return mockZooKeeper.getChildren(path, watch);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            return mockZooKeeper.getChildren(path, watch);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void getChildren(final String path, boolean watcher, final 
Children2Callback cb, final Object ctx) {
-        mockZooKeeper.getChildren(path, watcher, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.getChildren(path, watcher, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public Stat exists(String path, boolean watch) throws KeeperException, 
InterruptedException {
-        return mockZooKeeper.exists(path, watch);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            return mockZooKeeper.exists(path, watch);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public Stat exists(String path, Watcher watcher) throws KeeperException, 
InterruptedException {
-        return mockZooKeeper.exists(path, watcher);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            return mockZooKeeper.exists(path, watcher);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void exists(String path, boolean watch, StatCallback cb, Object 
ctx) {
-        mockZooKeeper.exists(path, watch, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.exists(path, watch, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void exists(String path, Watcher watcher, StatCallback cb, Object 
ctx) {
-        mockZooKeeper.exists(path, watcher, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.exists(path, watcher, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void sync(String path, VoidCallback cb, Object ctx) {
-        mockZooKeeper.sync(path, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.sync(path, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public Stat setData(final String path, byte[] data, int version) throws 
KeeperException, InterruptedException {
-        return mockZooKeeper.setData(path, data, version);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            return mockZooKeeper.setData(path, data, version);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void setData(final String path, final byte[] data, int version, 
final StatCallback cb, final Object ctx) {
-        mockZooKeeper.setData(path, data, version, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.setData(path, data, version, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void delete(final String path, int version) throws 
InterruptedException, KeeperException {
-        mockZooKeeper.delete(path, version);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.delete(path, version);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void delete(final String path, int version, final VoidCallback cb, 
final Object ctx) {
-        mockZooKeeper.delete(path, version, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.delete(path, version, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void multi(Iterable<org.apache.zookeeper.Op> ops, 
AsyncCallback.MultiCallback cb, Object ctx) {
-        mockZooKeeper.multi(ops, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.multi(ops, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws 
InterruptedException, KeeperException {
-        return mockZooKeeper.multi(ops);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            return mockZooKeeper.multi(ops);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, 
VoidCallback cb, Object ctx) {
-        mockZooKeeper.addWatch(basePath, watcher, mode, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.addWatch(basePath, watcher, mode, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
             throws KeeperException, InterruptedException {
-        mockZooKeeper.addWatch(basePath, watcher, mode);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.addWatch(basePath, watcher, mode);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void addWatch(String basePath, AddWatchMode mode) throws 
KeeperException, InterruptedException {
-        mockZooKeeper.addWatch(basePath, mode);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.addWatch(basePath, mode);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, 
Object ctx) {
-        mockZooKeeper.addWatch(basePath, mode, cb, ctx);
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.addWatch(basePath, mode, cb, ctx);
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
+        }
     }
 
     @Override
     public void close() throws InterruptedException {
-        if (closeMockZooKeeperOnClose) {
-            mockZooKeeper.close();
-        }
+        internalClose(false);
     }
 
     public void shutdown() throws InterruptedException {
-        if (closeMockZooKeeperOnClose) {
-            mockZooKeeper.shutdown();
+        internalClose(true);
+    }
+
+    private void internalClose(boolean shutdown) throws InterruptedException {
+        try {
+            mockZooKeeper.overrideSessionId(getSessionId());
+            mockZooKeeper.deleteEphemeralNodes(getSessionId());
+            mockZooKeeper.deleteWatchers(getSessionId());
+            if (closeMockZooKeeperOnClose) {
+                if (shutdown) {
+                    mockZooKeeper.shutdown();
+                } else {
+                    mockZooKeeper.close();
+                }
+            }
+        } finally {
+            mockZooKeeper.removeSessionIdOverride();
         }
     }
 


Reply via email to