This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 67ae209e995017fe41ac6a9c998aa7e63576c837 Author: Lari Hotari <[email protected]> AuthorDate: Mon Feb 17 18:54:43 2025 +0200 [fix][meta] Fix ephemeral handling of ZK nodes and fix MockZooKeeper ephemeral and ZK stat handling (#23988) (cherry picked from commit df5197212e8806c7d1907dedfcdfd9e40a4f0ea5) --- .../broker/loadbalance/impl/LoadManagerShared.java | 4 +- .../loadbalance/impl/ModularLoadManagerImpl.java | 2 +- .../pulsar/broker/web/PulsarWebResource.java | 2 +- .../pulsar/common/naming/NamespaceBundles.java | 8 +- .../broker/auth/MockedPulsarServiceBaseTest.java | 27 +- .../broker/testcontext/PulsarTestContext.java | 93 +- .../pulsar/client/api/BrokerServiceLookupTest.java | 113 +- .../pulsar/client/api/ProducerConsumerBase.java | 2 +- .../pulsar/metadata/impl/ZKMetadataStore.java | 14 +- .../replication/BookKeeperClusterTestCase.java | 4 + .../pulsar/metadata/BaseMetadataStoreTest.java | 124 +- .../org/apache/pulsar/metadata/CounterTest.java | 3 +- .../apache/pulsar/metadata/MetadataCacheTest.java | 29 +- .../pulsar/metadata/MetadataStoreExtendedTest.java | 6 +- .../apache/pulsar/metadata/MetadataStoreTest.java | 28 +- .../MockZooKeeperMetadataStoreProvider.java | 49 + .../bookkeeper/LedgerManagerIteratorTest.java | 18 +- .../LedgerUnderreplicationManagerTest.java | 8 +- .../bookkeeper/PulsarLedgerIdGeneratorTest.java | 2 +- .../impl/MetadataStoreFactoryImplTest.java | 21 +- .../java/org/apache/zookeeper/MockZooKeeper.java | 1199 ++++++++++---------- .../org/apache/zookeeper/MockZooKeeperSession.java | 201 +++- 22 files changed, 1202 insertions(+), 755 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 3d627db6cfa..db2122331a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -305,7 +305,7 @@ public class LoadManagerShared { public static String getBundleRangeFromBundleName(String bundleName) { // the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF int pos = bundleName.lastIndexOf("/"); - checkArgument(pos != -1); + checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName); return bundleName.substring(pos + 1); } @@ -313,7 +313,7 @@ public class LoadManagerShared { public static String getNamespaceNameFromBundleName(String bundleName) { // the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF int pos = bundleName.lastIndexOf('/'); - checkArgument(pos != -1); + checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName); return bundleName.substring(0, pos); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 52706a1b02b..2c74e0ffca2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -614,7 +614,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { for (String bundle : bundleData.keySet()) { if (!activeBundles.contains(bundle)){ bundleData.remove(bundle); - if (pulsar.getLeaderElectionService().isLeader()){ + if (pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader()){ deleteBundleDataFromMetadataStore(bundle); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index f9b1f0d08c0..914ca0b82bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1016,7 +1016,7 @@ public abstract class PulsarWebResource { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { return true; } - return pulsar.getLeaderElectionService().isLeader(); + return pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader(); } public void validateTenantOperation(String tenant, TenantOperation operation) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index cb7e135662c..900aa5336ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -113,9 +113,11 @@ public class NamespaceBundles { public void validateBundle(NamespaceBundle nsBundle) throws Exception { int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint()); - checkArgument(idx >= 0, "Cannot find bundle in the bundles list"); - checkArgument(nsBundle.getUpperEndpoint().equals(bundles.get(idx).getUpperEndpoint()), - "Invalid upper boundary for bundle"); + checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle); + NamespaceBundle foundBundle = bundles.get(idx); + Long upperEndpoint = foundBundle.getUpperEndpoint(); + checkArgument(nsBundle.getUpperEndpoint().equals(upperEndpoint), + "Invalid upper boundary for bundle %s. Expected upper boundary of %s", nsBundle, foundBundle); } public NamespaceBundle getFullBundle() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 29d350bbf90..377b183efda 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -141,6 +141,9 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { protected boolean enableBrokerInterceptor = false; + // Set to true in test's constructor to use a real Zookeeper (TestZKServer) + protected boolean useTestZookeeper; + public MockedPulsarServiceBaseTest() { resetConfig(); } @@ -309,7 +312,14 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { * @throws Exception if an error occurs */ protected void restartBroker() throws Exception { + restartBroker(null); + } + + protected void restartBroker(Consumer<ServiceConfiguration> configurationChanger) throws Exception { stopBroker(); + if (configurationChanger != null) { + configurationChanger.accept(conf); + } startBroker(); } @@ -400,7 +410,6 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { PulsarTestContext.Builder builder = PulsarTestContext.builder() .spyByDefault() .config(conf) - .withMockZookeeper(true) .pulsarServiceCustomizer(pulsarService -> { try { beforePulsarStart(pulsarService); @@ -409,9 +418,25 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { } }) .brokerServiceCustomizer(this::customizeNewBrokerService); + configureMetadataStores(builder); return builder; } + /** + * Configures the metadata stores for the PulsarTestContext.Builder instance. + * Set useTestZookeeper to true in the test's constructor to use TestZKServer which is a real ZooKeeper + * implementation. + * + * @param builder the PulsarTestContext.Builder instance to configure + */ + protected void configureMetadataStores(PulsarTestContext.Builder builder) { + if (useTestZookeeper) { + builder.withTestZookeeper(); + } else { + builder.withMockZookeeper(true); + } + } + /** * This method can be used in test classes for creating additional PulsarTestContext instances * that share the same mock ZooKeeper and BookKeeper instances as the main PulsarTestContext instance. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index d62600829da..feb0be3d947 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -19,12 +19,10 @@ package org.apache.pulsar.broker.testcontext; -import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.Consumer; @@ -33,6 +31,7 @@ import lombok.AccessLevel; import lombok.Builder; import lombok.Getter; import lombok.Singular; +import lombok.SneakyThrows; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; @@ -56,6 +55,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage; import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.metadata.TestZKServer; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -63,9 +63,11 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.MockZooKeeperSession; -import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.jetbrains.annotations.NotNull; import org.mockito.Mockito; import org.mockito.internal.util.MockUtil; @@ -150,6 +152,10 @@ public class PulsarTestContext implements AutoCloseable { private final MockZooKeeper mockZooKeeperGlobal; + private final TestZKServer testZKServer; + + private final TestZKServer testZKServerGlobal; + private final SpyConfig spyConfig; private final boolean startable; @@ -377,6 +383,11 @@ public class PulsarTestContext implements AutoCloseable { if (otherContext.getMockZooKeeperGlobal() != null) { mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal()); } + } else if (otherContext.getTestZKServer() != null) { + testZKServer(otherContext.getTestZKServer()); + if (otherContext.getTestZKServerGlobal() != null) { + testZKServerGlobal(otherContext.getTestZKServerGlobal()); + } } else { localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(), MetadataStoreExtended.class @@ -436,17 +447,56 @@ public class PulsarTestContext implements AutoCloseable { } private MockZooKeeper createMockZooKeeper() throws Exception { - MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()); - List<ACL> dummyAclList = new ArrayList<>(0); + MockZooKeeper zk = MockZooKeeper.newInstance(); + initializeZookeeper(zk); + registerCloseable(zk::shutdown); + return zk; + } + private static void initializeZookeeper(ZooKeeper zk) throws KeeperException, InterruptedException { ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000, - "".getBytes(StandardCharsets.UTF_8), dummyAclList, CreateMode.PERSISTENT); + "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList, + zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } - registerCloseable(zk::shutdown); - return zk; + /** + * Configure this PulsarTestContext to use a test ZooKeeper instance which is + * shared for both the local and configuration metadata stores. + * + * @return the builder + */ + public Builder withTestZookeeper() { + return withTestZookeeper(false); + } + + /** + * Configure this PulsarTestContext to use a test ZooKeeper instance. + * + * @param useSeparateGlobalZk if true, the global (configuration) zookeeper will be a separate instance + * @return the builder + */ + public Builder withTestZookeeper(boolean useSeparateGlobalZk) { + try { + testZKServer(createTestZookeeper()); + if (useSeparateGlobalZk) { + testZKServerGlobal(createTestZookeeper()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return this; + } + + private TestZKServer createTestZookeeper() throws Exception { + TestZKServer testZKServer = new TestZKServer(); + try (ZooKeeper zkc = new ZooKeeper(testZKServer.getConnectionString(), 5000, event -> { + })) { + initializeZookeeper(zkc); + } + registerCloseable(testZKServer); + return testZKServer; } /** @@ -628,6 +678,20 @@ public class PulsarTestContext implements AutoCloseable { configurationMetadataStore(mockZookeeperMetadataStore); } } + } else if (super.testZKServer != null) { + MetadataStoreExtended testZookeeperMetadataStore = + createTestZookeeperMetadataStore(super.testZKServer, MetadataStoreConfig.METADATA_STORE); + if (super.localMetadataStore == null) { + localMetadataStore(testZookeeperMetadataStore); + } + if (super.configurationMetadataStore == null) { + if (super.testZKServerGlobal != null) { + configurationMetadataStore(createTestZookeeperMetadataStore(super.testZKServerGlobal, + MetadataStoreConfig.CONFIGURATION_METADATA_STORE)); + } else { + configurationMetadataStore(testZookeeperMetadataStore); + } + } } else { try { MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local", @@ -672,6 +736,17 @@ public class PulsarTestContext implements AutoCloseable { return nonClosingProxy; } + @SneakyThrows + private MetadataStoreExtended createTestZookeeperMetadataStore(TestZKServer zkServer, + String metadataStoreName) { + MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + zkServer.getConnectionString(), + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build()); + registerCloseable(store); + MetadataStoreExtended nonClosingProxy = + NonClosingProxyHandler.createNonClosingProxy(store, MetadataStoreExtended.class); + return nonClosingProxy; + } + protected abstract void initializePulsarServices(SpyConfig spyConfig, Builder builder); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index f132aef96bd..4ceb4d20e53 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -88,6 +88,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PoliciesUtil; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; @@ -108,15 +109,41 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.ITest; +import org.testng.SkipException; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class BrokerServiceLookupTest extends ProducerConsumerBase { +public class BrokerServiceLookupTest extends ProducerConsumerBase implements ITest { private static final Logger log = LoggerFactory.getLogger(BrokerServiceLookupTest.class); + private String testName; + + @DataProvider + private static Object[] booleanValues() { + return new Object[]{ true, false }; + } + + @Factory(dataProvider = "booleanValues") + public BrokerServiceLookupTest(boolean useTestZookeeper) { + // when set to true, TestZKServer is used which is a real ZooKeeper implementation + this.useTestZookeeper = useTestZookeeper; + } + + @Override + public String getTestName() { + return testName; + } @BeforeMethod + public void applyTestName(Method method) { + testName = method.getName() + " with " + (useTestZookeeper ? "TestZKServer" : "MockZooKeeper"); + } + + @BeforeMethod(dependsOnMethods = "setTestMethodName") @Override protected void setup() throws Exception { conf.setDefaultNumberOfNamespaceBundles(1); @@ -125,10 +152,43 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { producerBaseSetup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + switch (methodName) { + case "testMultipleBrokerDifferentClusterLookup" -> { + conf.setAuthenticationEnabled(true); + } + case "testWebserviceServiceTls" -> { + // broker1 with tls enabled + conf.setBrokerServicePortTls(Optional.of(0)); + conf.setWebServicePortTls(Optional.of(0)); + conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH); + conf.setTlsRequireTrustedClientCertOnConnect(true); + conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH); + conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH); + conf.setNumExecutorThreadPoolSize(5); + // Not in use, and because TLS is not configured, it will fail to start + conf.setSystemTopicEnabled(false); + } + case "testSkipSplitBundleIfOnlyOneBroker" -> { + conf.setDefaultNumberOfNamespaceBundles(1); + conf.setLoadBalancerNamespaceBundleMaxTopics(1); + conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + } + case "testPartitionedMetadataWithDeprecatedVersion" -> { + conf.setBrokerServicePortTls(Optional.empty()); + conf.setWebServicePortTls(Optional.empty()); + conf.setClientLibraryVersionCheckEnabled(true); + } + } + } + @AfterMethod(alwaysRun = true) @Override protected void cleanup() throws Exception { internalCleanup(); + testName = null; } /** @@ -214,9 +274,11 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { @Test public void testConcurrentWriteBrokerData() throws Exception { Map<String, NamespaceBundleStats> map = new ConcurrentHashMap<>(); + List<String> boundaries = PoliciesUtil.getBundles(100).getBoundaries(); for (int i = 0; i < 100; i++) { - map.put("key"+ i, new NamespaceBundleStats()); + map.put("my-property/my-ns/" + boundaries.get(i), new NamespaceBundleStats()); } + BrokerService originalBrokerService = pulsar.getBrokerService(); BrokerService brokerService = mock(BrokerService.class); doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(map).when(brokerService).getBundleStats(); @@ -247,6 +309,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { for (Future<?> future : list) { future.get(); } + // allow proper shutdown so that resources aren't leaked + doReturn(originalBrokerService).when(pulsar).getBrokerService(); } /** @@ -294,12 +358,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { @Cleanup PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build(); - // enable authorization: so, broker can validate cluster and redirect if finds different cluster - pulsar.getConfiguration().setAuthorizationEnabled(true); - // restart broker with authorization enabled: it initialize AuthorizationService - stopBroker(); - startBroker(); - LoadManager loadManager2 = spy(pulsar2.getLoadManager().get()); Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); loadManagerField.setAccessible(true); @@ -336,10 +394,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { consumer.acknowledgeCumulative(msg); consumer.close(); producer.close(); - - // disable authorization - pulsar.getConfiguration().setAuthorizationEnabled(false); - loadManager2 = null; } /** @@ -457,18 +511,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(conf2); PulsarService pulsar2 = pulsarTestContext2.getPulsarService(); - // restart broker1 with tls enabled - conf.setBrokerServicePortTls(Optional.of(0)); - conf.setWebServicePortTls(Optional.of(0)); - conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH); - conf.setTlsRequireTrustedClientCertOnConnect(true); - conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH); - conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH); - conf.setNumExecutorThreadPoolSize(5); - // Not in use, and because TLS is not configured, it will fail to start - conf.setSystemTopicEnabled(false); - stopBroker(); - startBroker(); pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); pulsar2.getLoadManager().get().writeLoadReportOnZookeeper(); @@ -672,11 +714,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true); conf2.setLoadBalancerNamespaceBundleMaxTopics(1); - // configure broker-1 with ModularLoadManager - stopBroker(); - conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); - startBroker(); - @Cleanup PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(conf2); PulsarService pulsar2 = pulsarTestContext2.getPulsarService(); @@ -794,12 +831,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); try { - // configure broker with ModularLoadManager. - stopBroker(); - conf.setDefaultNumberOfNamespaceBundles(1); - conf.setLoadBalancerNamespaceBundleMaxTopics(1); - conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); - startBroker(); final ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) pulsar.getLoadManager().get(); final ModularLoadManagerImpl modularLoadManager = @@ -952,12 +983,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { admin.namespaces().createNamespace(property + "/" + cluster + "/" + namespace); admin.topics().createPartitionedTopic(dest.toString(), totalPartitions); - stopBroker(); - conf.setBrokerServicePortTls(Optional.empty()); - conf.setWebServicePortTls(Optional.empty()); - conf.setClientLibraryVersionCheckEnabled(true); - startBroker(); - URI brokerServiceUrl = new URI(pulsar.getSafeWebServiceAddress()); URL url = brokerServiceUrl.toURL(); @@ -1116,6 +1141,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { @Test public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Exception { + if (useTestZookeeper) { + throw new SkipException("This test case depends on MockZooKeeper"); + } String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); admin.topics().createNonPartitionedTopic(tpName); PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; @@ -1211,7 +1239,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { } } - @Test(timeOut = 30000) + // TODO: This test is disabled since it's invalid. The test fails for both TestZKServer and MockZooKeeper. + @Test(timeOut = 30000, enabled = false) public void testLookupConnectionNotCloseIfFailedToAcquireOwnershipOfBundle() throws Exception { String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); admin.topics().createNonPartitionedTopic(tpName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java index ef070250ca1..295120bf369 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java @@ -40,7 +40,7 @@ public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest { protected String methodName; @BeforeMethod(alwaysRun = true) - public void beforeMethod(Method m) throws Exception { + public void setTestMethodName(Method m) throws Exception { methodName = m.getName(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 78e9980bc21..21128c77477 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -71,9 +71,10 @@ import org.apache.zookeeper.client.ConnectStringParser; @Slf4j public class ZKMetadataStore extends AbstractBatchedMetadataStore implements MetadataStoreExtended, MetadataStoreLifecycle { - public static final String ZK_SCHEME = "zk"; public static final String ZK_SCHEME_IDENTIFIER = "zk:"; + // ephemeralOwner value for persistent nodes + private static final long NOT_EPHEMERAL = 0L; private final String zkConnectString; private final String rootPath; @@ -128,12 +129,17 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore @VisibleForTesting @SneakyThrows public ZKMetadataStore(ZooKeeper zkc, MetadataStoreConfig config) { - super(config); + this(zkc, config, false); + } + @VisibleForTesting + @SneakyThrows + public ZKMetadataStore(ZooKeeper zkc, MetadataStoreConfig config, boolean isZkManaged) { + super(config); this.zkConnectString = null; this.rootPath = null; this.metadataStoreConfig = null; - this.isZkManaged = false; + this.isZkManaged = isZkManaged; this.zkc = zkc; this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE); @@ -477,7 +483,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore private Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) { return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(), - zkStat.getEphemeralOwner() != -1, + zkStat.getEphemeralOwner() != NOT_EPHEMERAL, zkStat.getEphemeralOwner() == zkc.getSessionId()); } diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java index 9a8e3ef5a2d..1e861bbf88c 100644 --- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java @@ -207,6 +207,7 @@ public abstract class BookKeeperClusterTestCase { try { // cleanup for metrics. metadataStore.close(); + metadataStore = null; stopZKCluster(); } catch (Exception e) { LOG.error("Got Exception while trying to stop ZKCluster", e); @@ -236,6 +237,9 @@ public abstract class BookKeeperClusterTestCase { protected void startZKCluster() throws Exception { zkUtil.startCluster(); zkc = zkUtil.getZooKeeperClient(); + if (metadataStore != null) { + metadataStore.close(); + } metadataStore = new FaultInjectionMetadataStore( MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(), MetadataStoreConfig.builder().metadataStoreName("metastore-" + getClass().getSimpleName()).build())); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java index c419f612735..c4bd1959acc 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java @@ -24,11 +24,18 @@ import io.etcd.jetcd.launcher.EtcdCluster; import io.etcd.jetcd.test.EtcdClusterExtension; import java.io.File; import java.net.URI; +import java.util.Arrays; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletionException; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreFactory; +import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl; import org.apache.pulsar.tests.TestRetrySupport; import org.assertj.core.util.Files; import org.testng.annotations.AfterClass; @@ -36,19 +43,45 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; public abstract class BaseMetadataStoreTest extends TestRetrySupport { + // to debug specific implementations, set the TEST_METADATA_PROVIDERS environment variable + // or temporarily hard code this value in the test class before running tests in the IDE + // supported values are ZooKeeper,Memory,RocksDB,Etcd,Oxia,MockZooKeeper + private static final String TEST_METADATA_PROVIDERS = System.getenv("TEST_METADATA_PROVIDERS"); + private static String originalMetadatastoreProvidersPropertyValue; protected TestZKServer zks; protected EtcdCluster etcdCluster; - + private String mockZkUrl; + // reference to keep the MockZooKeeper instance alive in MockZookeeperMetadataStoreProvider + private MetadataStore mockZkStoreRef; + private String zksConnectionString; + private String memoryConnectionString; + private String rocksdbConnectionString; + private File rocksDbDirectory; + private boolean running; @BeforeClass(alwaysRun = true) @Override public void setup() throws Exception { + running = true; incrementSetupNumber(); zks = new TestZKServer(); + zksConnectionString = zks.getConnectionString(); + memoryConnectionString = "memory:" + UUID.randomUUID(); + rocksDbDirectory = Files.newTemporaryFolder().getAbsoluteFile(); + rocksdbConnectionString = "rocksdb:" + rocksDbDirectory; + originalMetadatastoreProvidersPropertyValue = + System.getProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY); + // register MockZooKeeperMetadataStoreProvider + System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY, + MockZooKeeperMetadataStoreProvider.class.getName()); + mockZkUrl = "mock-zk:" + UUID.randomUUID(); + // create a reference in MockZooKeeperMetadataStoreProvider to keep the MockZooKeeper instance alive + mockZkStoreRef = MetadataStoreFactory.create(mockZkUrl, MetadataStoreConfig.builder().build()); } @AfterClass(alwaysRun = true) @Override public void cleanup() throws Exception { + running = false; markCurrentSetupNumberCleaned(); if (zks != null) { zks.close(); @@ -59,30 +92,71 @@ public abstract class BaseMetadataStoreTest extends TestRetrySupport { etcdCluster.close(); etcdCluster = null; } - } - private static String createTempFolder() { - File temp = Files.newTemporaryFolder(); - temp.deleteOnExit(); - return temp.getAbsolutePath(); + if (mockZkStoreRef != null) { + mockZkStoreRef.close(); + mockZkStoreRef = null; + mockZkUrl = null; + } + + if (rocksDbDirectory != null) { + Files.delete(rocksDbDirectory); + rocksDbDirectory = null; + } + + if (originalMetadatastoreProvidersPropertyValue != null) { + System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY, + originalMetadatastoreProvidersPropertyValue); + } else { + System.clearProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY); + } } @DataProvider(name = "impl") public Object[][] implementations() { + // If the environment variable TEST_METADATA_PROVIDERS is set, only run the specified implementations + if (StringUtils.isNotBlank(TEST_METADATA_PROVIDERS)) { + return filterImplementations(TEST_METADATA_PROVIDERS.split(",")); + } + return allImplementations(); + } + + private Object[][] allImplementations() { // A Supplier<String> must be used for the Zookeeper connection string parameter. The retried test run will // use the same arguments as the failed attempt. // The Zookeeper test server gets restarted by TestRetrySupport before the retry. // The new connection string won't be available to the test method unless a // Supplier<String> lambda is used for providing the value. return new Object[][]{ - {"ZooKeeper", stringSupplier(() -> zks.getConnectionString())}, - {"Memory", stringSupplier(() -> "memory:" + UUID.randomUUID())}, - {"RocksDB", stringSupplier(() -> "rocksdb:" + createTempFolder())}, + {"ZooKeeper", stringSupplier(() -> zksConnectionString)}, + {"Memory", stringSupplier(() -> memoryConnectionString)}, + {"RocksDB", stringSupplier(() -> rocksdbConnectionString)}, {"Etcd", stringSupplier(() -> "etcd:" + getEtcdClusterConnectString())}, + {"MockZooKeeper", stringSupplier(() -> mockZkUrl)}, }; } + @DataProvider(name = "distributedImpl") + public Object[][] distributedImplementations() { + return filterImplementations("ZooKeeper", "Etcd"); + } + + @DataProvider(name = "zkImpls") + public Object[][] zkImplementations() { + return filterImplementations("ZooKeeper", "MockZooKeeper"); + } + + protected Object[][] filterImplementations(String... providers) { + Set<String> providersSet = Set.of(providers); + return Arrays.stream(allImplementations()) + .filter(impl -> providersSet.contains(impl[0])) + .toArray(Object[][]::new); + } + private synchronized String getEtcdClusterConnectString() { + if (!running) { + return null; + } if (etcdCluster == null) { etcdCluster = EtcdClusterExtension.builder().withClusterName("test").withNodes(1).withSsl(false).build() .cluster(); @@ -92,7 +166,26 @@ public abstract class BaseMetadataStoreTest extends TestRetrySupport { } public static Supplier<String> stringSupplier(Supplier<String> supplier) { - return supplier; + return new StringSupplier(supplier); + } + + // Implements toString() so that the test name is more descriptive + private static class StringSupplier implements Supplier<String> { + private final Supplier<String> supplier; + + public StringSupplier(Supplier<String> supplier) { + this.supplier = supplier; + } + + @Override + public String get() { + return supplier.get(); + } + + @Override + public String toString() { + return get(); + } } protected String newKey() { @@ -138,4 +231,15 @@ public abstract class BaseMetadataStoreTest extends TestRetrySupport { } return false; } + + /** + * Delete all the empty container nodes + * @param provider the metadata store provider + * @throws Exception + */ + protected void maybeTriggerDeletingEmptyContainers(String provider) throws Exception { + if ("ZooKeeper".equals(provider) && zks != null) { + zks.checkContainers(); + } + } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java index c5b4012f0c8..bd068539cc5 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java @@ -70,6 +70,7 @@ public class CounterTest extends BaseMetadataStoreTest { return; } String metadataUrl = urlSupplier.get(); + @Cleanup MetadataStoreExtended store1 = MetadataStoreExtended.create(metadataUrl, MetadataStoreConfig.builder().build()); CoordinationService cs1 = new CoordinationServiceImpl(store1); @@ -85,7 +86,7 @@ public class CounterTest extends BaseMetadataStoreTest { store1.close(); // Delete all the empty container nodes - zks.checkContainers(); + maybeTriggerDeletingEmptyContainers(provider); @Cleanup MetadataStoreExtended store2 = MetadataStoreExtended.create(metadataUrl, MetadataStoreConfig.builder().build()); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index 591c0a23a9b..14fe3d35325 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -69,7 +69,6 @@ import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.awaitility.Awaitility; import org.mockito.stubbing.Answer; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -111,14 +110,7 @@ public class MetadataCacheTest extends BaseMetadataStoreTest { } } - @DataProvider(name = "zk") - public Object[][] zkimplementations() { - return new Object[][] { - { "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) }, - }; - } - - @Test(dataProvider = "zk") + @Test(dataProvider = "zkImpls") public void crossStoreAddDelete(String provider, Supplier<String> urlSupplier) throws Exception { @Cleanup MetadataStore store1 = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); @@ -183,7 +175,7 @@ public class MetadataCacheTest extends BaseMetadataStoreTest { }); } - @Test(dataProvider = "zk") + @Test(dataProvider = "zkImpls") public void crossStoreUpdates(String provider, Supplier<String> urlSupplier) throws Exception { String testName = "cross store updates"; @Cleanup @@ -493,11 +485,10 @@ public class MetadataCacheTest extends BaseMetadataStoreTest { * * @throws Exception */ - @Test - public void readModifyUpdateBadVersionRetry() throws Exception { - String url = zks.getConnectionString(); + @Test(dataProvider = "zkImpls") + public void readModifyUpdateBadVersionRetry(String provider, Supplier<String> urlSupplier) throws Exception { @Cleanup - MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); MetadataCache<MyClass> cache = store.getMetadataCache(MyClass.class); @@ -511,7 +502,8 @@ public class MetadataCacheTest extends BaseMetadataStoreTest { final var sourceStores = new ArrayList<MetadataStore>(); for (int i = 0; i < 20; i++) { - final var sourceStore = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + final var sourceStore = + MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); sourceStores.add(sourceStore); final var objCache = sourceStore.getMetadataCache(MyClass.class); futures.add(objCache.readModifyUpdate(key1, v -> new MyClass(v.a, v.b + 1))); @@ -522,11 +514,10 @@ public class MetadataCacheTest extends BaseMetadataStoreTest { } } - @Test - public void readModifyUpdateOrCreateRetryTimeout() throws Exception { - String url = zks.getConnectionString(); + @Test(dataProvider = "zkImpls") + public void readModifyUpdateOrCreateRetryTimeout(String provider, Supplier<String> urlSupplier) throws Exception { @Cleanup - MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); MetadataCache<MyClass> cache = store.getMetadataCache(MyClass.class, MetadataCacheConfig.builder() .retryBackoff(new BackoffBuilder() diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java index a4c937611fd..30fbd9b836e 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.metadata; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -70,18 +71,19 @@ public class MetadataStoreExtendedTest extends BaseMetadataStoreTest { @Test(dataProvider = "impl") public void testPersistentOrEphemeralPut(String provider, Supplier<String> urlSupplier) throws Exception { final String key1 = newKey(); + @Cleanup MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); store.put(key1, "value-1".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join(); var value = store.get(key1).join().get(); assertEquals(value.getValue(), "value-1".getBytes()); - // assertFalse(value.getStat().isEphemeral()); // Todo : fix zkStat.getEphemeralOwner() != 0 from test zk + assertFalse(value.getStat().isEphemeral()); assertTrue(value.getStat().isFirstVersion()); var version = value.getStat().getVersion(); store.put(key1, "value-2".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join(); value = store.get(key1).join().get(); assertEquals(value.getValue(), "value-2".getBytes()); - //assertFalse(value.getStat().isEphemeral()); // Todo : fix zkStat.getEphemeralOwner() != 0 from test zk + assertFalse(value.getStat().isEphemeral()); assertEquals(value.getStat().getVersion(), version + 1); final String key2 = newKey(); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index c87e9bda436..7b60a6be581 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.metadata; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -54,6 +55,7 @@ import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.assertj.core.util.Lists; import org.awaitility.Awaitility; +import org.testng.SkipException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -97,7 +99,7 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { MetadataStoreConfig.builder().fsyncEnable(false).build()); String data = "data"; - String path = "/non-existing-key"; + String path = "/concurrentPutTest"; int concurrent = 50; List<CompletableFuture<Stat>> futureList = new ArrayList<>(); for (int i = 0; i < concurrent; i++) { @@ -400,6 +402,10 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { @Test(dataProvider = "impl") public void testDeleteUnusedDirectories(String provider, Supplier<String> urlSupplier) throws Exception { + if (provider.equals("MockZooKeeper")) { + throw new SkipException("MockZooKeeper does not support deleteUnusedDirectories"); + } + @Cleanup MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().fsyncEnable(false).build()); @@ -413,18 +419,18 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { store.delete(prefix + "/a1/b1/c1", Optional.empty()).join(); store.delete(prefix + "/a1/b1/c2", Optional.empty()).join(); - zks.checkContainers(); + maybeTriggerDeletingEmptyContainers(provider); assertFalse(store.exists(prefix + "/a1/b1").join()); store.delete(prefix + "/a1/b2/c1", Optional.empty()).join(); - zks.checkContainers(); + maybeTriggerDeletingEmptyContainers(provider); assertFalse(store.exists(prefix + "/a1/b2").join()); - zks.checkContainers(); + maybeTriggerDeletingEmptyContainers(provider); assertFalse(store.exists(prefix + "/a1").join()); - zks.checkContainers(); + maybeTriggerDeletingEmptyContainers(provider); assertFalse(store.exists(prefix).join()); } @@ -602,21 +608,25 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { store.put("/b/c/b/1", "value1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); List<String> subPaths = store.getChildren("/").get(); - Set<String> expectedSet = "ZooKeeper".equals(provider) ? Set.of("a", "b", "zookeeper") : Set.of("a", "b"); + Set<String> ignoredRootPaths = Set.of("zookeeper"); + Set<String> expectedSet = Set.of("a", "b"); for (String subPath : subPaths) { - assertTrue(expectedSet.contains(subPath)); + if (ignoredRootPaths.contains(subPath)) { + continue; + } + assertThat(expectedSet).contains(subPath); } List<String> subPaths2 = store.getChildren("/a").get(); Set<String> expectedSet2 = Set.of("a-1", "a-2"); for (String subPath : subPaths2) { - assertTrue(expectedSet2.contains(subPath)); + assertThat(expectedSet2).contains(subPath); } List<String> subPaths3 = store.getChildren("/b").get(); Set<String> expectedSet3 = Set.of("c"); for (String subPath : subPaths3) { - assertTrue(expectedSet3.contains(subPath)); + assertThat(expectedSet3).contains(subPath); } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java new file mode 100644 index 00000000000..994a97c2b10 --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreProvider; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.zookeeper.MockZooKeeper; +import org.apache.zookeeper.MockZooKeeperSession; + +public class MockZooKeeperMetadataStoreProvider implements MetadataStoreProvider { + private static final String MOCK_ZK_SCHEME = "mock-zk"; + private static final ConcurrentMap<String, MockZooKeeper> mockZooKeepers = new ConcurrentHashMap<>(); + + @Override + public String urlScheme() { + return MOCK_ZK_SCHEME; + } + + @Override + public MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig, + boolean enableSessionWatcher) throws MetadataStoreException { + MockZooKeeper mockZooKeeper = mockZooKeepers.computeIfAbsent(metadataURL, + k -> MockZooKeeper.newInstance().registerCloseable(() -> mockZooKeepers.remove(k))); + MockZooKeeperSession mockZooKeeperSession = MockZooKeeperSession.newInstance(mockZooKeeper, true); + ZKMetadataStore zkMetadataStore = new ZKMetadataStore(mockZooKeeperSession, metadataStoreConfig, true); + return zkMetadataStore; + } +} diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java index b64cc964a99..bf67d0218b0 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java @@ -373,7 +373,7 @@ public class LedgerManagerIteratorTest extends BaseMetadataStoreTest { assertEquals(ledgersReadAsync, ids, "Comparing LedgersIds read asynchronously"); } - @Test(timeOut = 30000, dataProvider = "impl") + @Test(timeOut = 60000, dataProvider = "impl") public void checkConcurrentModifications(String provider, Supplier<String> urlSupplier) throws Throwable { @Cleanup MetadataStoreExtended store = @@ -406,14 +406,16 @@ public class LedgerManagerIteratorTest extends BaseMetadataStoreTest { ExecutorService executor = Executors.newCachedThreadPool(); final ConcurrentSkipListSet<Long> createdLedgers = new ConcurrentSkipListSet<>(); for (int i = 0; i < numWriters; ++i) { + int writerIndex = i; Future<?> f = executor.submit(() -> { @Cleanup LedgerManager writerLM = new PulsarLedgerManager(store, ledgersRoot); Random writerRNG = new Random(rng.nextLong()); - + log.info("Writer {} waiting", writerIndex); latch.await(); - + log.info("Writer {} started", writerIndex); while (MathUtils.elapsedNanos(start) < runtime) { + log.info("Writer {} writing", writerIndex); long candidate = 0; do { candidate = Math.abs(writerRNG.nextLong()); @@ -425,18 +427,22 @@ public class LedgerManagerIteratorTest extends BaseMetadataStoreTest { createLedger(writerLM, candidate); removeLedger(writerLM, candidate); } + log.info("Writer {} finished", writerIndex); return null; }); futures.add(f); } for (int i = 0; i < numCheckers; ++i) { + int checkerIndex = i; Future<?> f = executor.submit(() -> { @Cleanup LedgerManager checkerLM = new PulsarLedgerManager(store, ledgersRoot); + log.info("Checker {} waiting", checkerIndex); latch.await(); - + log.info("Checker {} started", checkerIndex); while (MathUtils.elapsedNanos(start) < runtime) { + log.info("Checker {} checking", checkerIndex); LedgerRangeIterator lri = checkerLM.getLedgerRanges(0); Set<Long> returnedIds = ledgerRangeToSet(lri); for (long id : mustExist) { @@ -448,15 +454,19 @@ public class LedgerManagerIteratorTest extends BaseMetadataStoreTest { assertTrue(ledgersReadAsync.contains(id)); } } + log.info("Checker {} finished", checkerIndex); return null; }); futures.add(f); } latch.countDown(); + log.info("Waiting for futures"); for (Future<?> f : futures) { + log.info("Waiting for future"); f.get(); } + log.info("Completed"); executor.shutdownNow(); } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java index 0e9c781fb91..ac73491a81c 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java @@ -300,10 +300,10 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest { assertEquals(l, lB.get(), "Should be the ledger I marked"); } - - @Test(timeOut = 10000) - public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes() throws Exception { - methodSetup(stringSupplier(() -> zks.getConnectionString())); + @Test(dataProvider = "zkImpls", timeOut = 10000) + public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes(String provider, Supplier<String> urlSupplier) + throws Exception { + methodSetup(urlSupplier); String missingReplica = "localhost:3181"; diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java index 73d5f451c1f..da3fd7f7bd4 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java @@ -242,7 +242,7 @@ public class PulsarLedgerIdGeneratorTest extends BaseMetadataStoreTest { l1.await(); log.info("res1 : {}", res1); - zks.checkContainers(); + maybeTriggerDeletingEmptyContainers(provider); CountDownLatch l2 = new CountDownLatch(1); AtomicLong res2 = new AtomicLong(); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java index c0159be4303..34e860aa578 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java @@ -20,6 +20,10 @@ package org.apache.pulsar.metadata.impl; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import lombok.Cleanup; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; @@ -31,26 +35,25 @@ import org.apache.pulsar.metadata.api.extended.CreateOption; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.EnumSet; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; public class MetadataStoreFactoryImplTest { - - private static Object originalProperty; + private static String originalMetadatastoreProvidersPropertyValue; @BeforeClass public void setMetadataStoreProperty() { - originalProperty = System.getProperties().get(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY); + originalMetadatastoreProvidersPropertyValue = + System.getProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY); System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY, MyMetadataStoreProvider.class.getName()); } @AfterClass public void resetMetadataStoreProperty() { - if (originalProperty != null) { - System.getProperties().put(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY, originalProperty); + if (originalMetadatastoreProvidersPropertyValue != null) { + System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY, + originalMetadatastoreProvidersPropertyValue); + } else { + System.clearProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY); } } diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index f32036e53f0..e124699ee13 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -19,34 +19,32 @@ package org.apache.zookeeper; import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiPredicate; import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.zookeeper.AsyncCallback.Children2Callback; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.apache.zookeeper.AsyncCallback.DataCallback; @@ -57,6 +55,8 @@ import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.DeleteRequest; +import org.apache.zookeeper.proto.SetDataRequest; import org.objenesis.Objenesis; import org.objenesis.ObjenesisStd; import org.objenesis.instantiator.ObjectInstantiator; @@ -64,33 +64,79 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MockZooKeeper extends ZooKeeper { - @Data + // ephemeralOwner value for persistent nodes + private static final long NOT_EPHEMERAL = 0L; + private static final String ROOT_PATH = "/"; + @AllArgsConstructor private static class MockZNode { byte[] content; int version; long ephemeralOwner; + long creationTimestamp; + long modificationTimestamp; + List<String> children; static MockZNode of(byte[] content, int version, long ephemeralOwner) { - return new MockZNode(content, version, ephemeralOwner); + return new MockZNode(content, version, ephemeralOwner, System.currentTimeMillis(), + System.currentTimeMillis(), new ArrayList<>()); + } + + public void updateVersion() { + version++; + modificationTimestamp = System.currentTimeMillis(); + } + + public void updateData(byte[] data) { + content = data; + updateVersion(); + } + + public Stat getStat() { + return applyToStat(new Stat()); + } + + public Stat applyToStat(Stat stat) { + stat.setCtime(creationTimestamp); + stat.setMtime(modificationTimestamp); + stat.setVersion(version); + stat.setEphemeralOwner(ephemeralOwner); + return stat; + } + + public int getVersion() { + return version; + } + + public byte[] getContent() { + return content; + } + + public long getEphemeralOwner() { + return ephemeralOwner; + } + + public List<String> getChildren() { + return children; } } private TreeMap<String, MockZNode> tree; - private SetMultimap<String, Watcher> watchers; - private volatile boolean stopped; + private SetMultimap<String, NodeWatcher> watchers; + private AtomicBoolean stopped; private AtomicReference<KeeperException.Code> alwaysFail; private CopyOnWriteArrayList<Failure> failures; private ExecutorService executor; - private Watcher sessionWatcher; - private long sessionId = 0L; + private volatile Watcher sessionWatcher; + private long sessionId = 1L; private int readOpDelayMs; - private ReentrantLock mutex; - private AtomicLong sequentialIdGenerator; - private ThreadLocal<Long> epheralOwnerThreadLocal; + private ThreadLocal<Long> overriddenSessionIdThreadLocal; + private ThreadLocal<Boolean> inExecutorThreadLocal; + private int referenceCount; + private List<AutoCloseable> closeables; //see details of Objenesis caching - http://objenesis.org/details.html //see supported jvms - https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md @@ -110,41 +156,21 @@ public class MockZooKeeper extends ZooKeeper { } } - @Data - @AllArgsConstructor - private static class PersistentWatcher { - final String path; - final Watcher watcher; - final AddWatchMode mode; + private record PersistentWatcher(String path, Watcher watcher, AddWatchMode mode, long sessionId) { } - private List<PersistentWatcher> persistentWatchers; - - public static MockZooKeeper newInstance() { - return newInstance(null); + private record NodeWatcher(Watcher watcher, long sessionId) { } - public static MockZooKeeper newInstance(ExecutorService executor) { - return newInstance(executor, -1); - } - - public static MockZooKeeper newInstanceForGlobalZK(ExecutorService executor) { - return newInstanceForGlobalZK(executor, -1); - } + private List<PersistentWatcher> persistentWatchers; - public static MockZooKeeper newInstanceForGlobalZK(ExecutorService executor, int readOpDelayMs) { - try { - return createMockZooKeeperInstance(executor, readOpDelayMs); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new IllegalStateException("Cannot create object", e); - } + public static MockZooKeeper newInstance() { + return newInstance(-1); } - public static MockZooKeeper newInstance(ExecutorService executor, int readOpDelayMs) { + public static MockZooKeeper newInstance(int readOpDelayMs) { try { - return createMockZooKeeperInstance(executor, readOpDelayMs); + return createMockZooKeeperInstance(readOpDelayMs); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -152,29 +178,25 @@ public class MockZooKeeper extends ZooKeeper { } } - private static MockZooKeeper createMockZooKeeperInstance(ExecutorService executor, int readOpDelayMs) { + private static MockZooKeeper createMockZooKeeperInstance(int readOpDelayMs) { ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator = objenesis.getInstantiatorOf(MockZooKeeper.class); MockZooKeeper zk = mockZooKeeperInstantiator.newInstance(); - zk.epheralOwnerThreadLocal = new ThreadLocal<>(); - zk.init(executor); + zk.overriddenSessionIdThreadLocal = new ThreadLocal<>(); + zk.inExecutorThreadLocal = ThreadLocal.withInitial(() -> false); + zk.init(); zk.readOpDelayMs = readOpDelayMs; - zk.mutex = new ReentrantLock(); - zk.lockInstance = ThreadLocal.withInitial(zk::createLock); zk.sequentialIdGenerator = new AtomicLong(); + zk.closeables = new ArrayList<>(); return zk; } - private void init(ExecutorService executor) { + private void init() { tree = Maps.newTreeMap(); - if (executor != null) { - this.executor = executor; - } else { - this.executor = Executors.newFixedThreadPool(1, new DefaultThreadFactory("mock-zookeeper")); - } - SetMultimap<String, Watcher> w = HashMultimap.create(); - watchers = Multimaps.synchronizedSetMultimap(w); - stopped = false; + tree.put(ROOT_PATH, MockZNode.of(new byte[0], 0, NOT_EPHEMERAL)); + this.executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mock-zookeeper")); + watchers = HashMultimap.create(); + stopped = new AtomicBoolean(false); alwaysFail = new AtomicReference<>(KeeperException.Code.OK); failures = new CopyOnWriteArrayList<>(); persistentWatchers = new ArrayList<>(); @@ -197,101 +219,143 @@ public class MockZooKeeper extends ZooKeeper { return States.CONNECTED; } + @Override + public void register(Watcher watcher) { + sessionWatcher = watcher; + } - @Slf4j - private static class SingleAcquireAndReleaseLock { - private final AtomicBoolean acquired = new AtomicBoolean(false); - private final Lock lock; + @Override + public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) + throws KeeperException, InterruptedException { + return runInExecutorReturningValue(() -> internalCreate(path, data, createMode)); + } - SingleAcquireAndReleaseLock(Lock lock) { - this.lock = lock; + private <T> T runInExecutorReturningValue(Callable<T> task) + throws InterruptedException, KeeperException { + if (isStopped()) { + throw new KeeperException.ConnectionLossException(); } - - public void lock() { - if (acquired.compareAndSet(false, true)) { - lock.lock(); - } else { - throw new IllegalStateException("Lock was already acquired!"); + if (inExecutorThreadLocal.get()) { + try { + return task.call(); + } catch (Exception e) { + if (e instanceof KeeperException ke) { + throw ke; + } + if (e instanceof InterruptedException ie) { + throw ie; + } + log.error("Unexpected exception", e); + throw new KeeperException.SystemErrorException(); } } - - public void unlockIfNeeded() { - if (acquired.compareAndSet(true, false)) { - lock.unlock(); + try { + long currentSessionId = getSessionId(); + return executor.submit(() -> { + inExecutorThreadLocal.set(true); + overrideSessionId(currentSessionId); + try { + return task.call(); + } finally { + removeSessionIdOverride(); + inExecutorThreadLocal.set(false); + } + }).get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof KeeperException ke) { + throw ke; + } + if (cause instanceof InterruptedException ie) { + throw ie; } + log.error("Unexpected exception", e); + throw new KeeperException.SystemErrorException(); } } - private ThreadLocal<SingleAcquireAndReleaseLock> lockInstance; - - private SingleAcquireAndReleaseLock createLock() { - return new SingleAcquireAndReleaseLock(mutex); - } - - private void lock() { - lockInstance.get().lock(); - } - - private void unlockIfLocked() { - lockInstance.get().unlockIfNeeded(); + private void runInExecutorAsync(Runnable runnable) { + if (isStopped()) { + throw new RejectedExecutionException("MockZooKeeper is stopped"); + } + if (inExecutorThreadLocal.get()) { + try { + runnable.run(); + } catch (Throwable t) { + log.error("Unexpected exception", t); + } + return; + } + long currentSessionId = getSessionId(); + executor.submit(() -> { + try { + inExecutorThreadLocal.set(true); + overrideSessionId(currentSessionId); + try { + runnable.run(); + } finally { + removeSessionIdOverride(); + inExecutorThreadLocal.set(false); + } + } catch (Throwable t) { + log.error("Unexpected exception", t); + } + }); } - @Override - public void register(Watcher watcher) { - lock(); - sessionWatcher = watcher; - unlockIfLocked(); + private void runInExecutorSync(Runnable runnable) { + try { + runInExecutorReturningValue(() -> { + runnable.run(); + return null; + }); + } catch (Exception e) { + log.error("Unexpected error", e); + } } - @Override - public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) - throws KeeperException, InterruptedException { + private String internalCreate(String path, byte[] data, CreateMode createMode) throws KeeperException { final Set<Watcher> toNotifyCreate = Sets.newHashSet(); final Set<Watcher> toNotifyParent = Sets.newHashSet(); - final String parent = path.substring(0, path.lastIndexOf("/")); - - lock(); - try { - + final String parent = getParentName(path); - maybeThrowProgrammedFailure(Op.CREATE, path); + maybeThrowProgrammedFailure(Op.CREATE, path); - if (stopped) { - throw new KeeperException.ConnectionLossException(); - } - - if (tree.containsKey(path)) { - throw new KeeperException.NodeExistsException(path); - } + if (isStopped()) { + throw new KeeperException.ConnectionLossException(); + } - if (!parent.isEmpty() && !tree.containsKey(parent)) { - throw new KeeperException.NoNodeException(); - } + if (tree.containsKey(path)) { + throw new KeeperException.NodeExistsException(path); + } - if (createMode.isSequential()) { - MockZNode parentNode = tree.get(parent); - int parentVersion = tree.get(parent).getVersion(); - path = path + parentVersion; + MockZNode parentNode = tree.get(parent); - // Update parent version - tree.put(parent, - MockZNode.of(parentNode.getContent(), parentVersion + 1, parentNode.getEphemeralOwner())); - } + if (parentNode == null) { + throw new KeeperException.NoNodeException(parent); + } - tree.put(path, MockZNode.of(data, 0, createMode.isEphemeral() ? getEphemeralOwner() : -1L)); + if (createMode.isSequential()) { + int parentVersion = parentNode.getVersion(); + path = path + parentVersion; + parentNode.updateVersion(); + } - toNotifyCreate.addAll(watchers.get(path)); + parentNode.getChildren().add(getNodeName(path)); + tree.put(path, createMockZNode(data, createMode)); - if (!parent.isEmpty()) { - toNotifyParent.addAll(watchers.get(parent)); - } - watchers.removeAll(path); - } finally { - unlockIfLocked(); + toNotifyCreate.addAll(getWatchers(path)); + if (!ROOT_PATH.equals(parent)) { + toNotifyParent.addAll(getWatchers(parent)); } + watchers.removeAll(path); final String finalPath = path; executor.execute(() -> { + if (isStopped()) { + return; + } + triggerPersistentWatches(finalPath, parent, EventType.NodeCreated); toNotifyCreate.forEach( @@ -309,43 +373,62 @@ public class MockZooKeeper extends ZooKeeper { return path; } - protected long getEphemeralOwner() { - Long epheralOwner = epheralOwnerThreadLocal.get(); - if (epheralOwner != null) { - return epheralOwner; + private static String getParentName(String path) { + String parentName = path.substring(0, path.lastIndexOf('/')); + return parentName.length() > 0 ? parentName : "/"; + } + + private static String getNodeName(String path) { + return path.substring(path.lastIndexOf('/') + 1); + } + + private Collection<Watcher> getWatchers(String path) { + Set<NodeWatcher> nodeWatchers = watchers.get(path); + if (nodeWatchers != null) { + return nodeWatchers.stream().map(NodeWatcher::watcher).toList(); + } else { + return Collections.emptyList(); } - return getSessionId(); } - public void overrideEpheralOwner(long epheralOwner) { - epheralOwnerThreadLocal.set(epheralOwner); + @Override + public long getSessionId() { + Long overriddenSessionId = overriddenSessionIdThreadLocal.get(); + if (overriddenSessionId != null) { + return overriddenSessionId; + } + return sessionId; + } + + public void overrideSessionId(long sessionId) { + overriddenSessionIdThreadLocal.set(sessionId); } - public void removeEpheralOwnerOverride() { - epheralOwnerThreadLocal.remove(); + public void removeSessionIdOverride() { + overriddenSessionIdThreadLocal.remove(); } @Override public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode, final StringCallback cb, final Object ctx) { - - - executor.execute(() -> { + if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); + return; + } + runInExecutorAsync(() -> { try { - lock(); - - if (stopped) { + if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); return; } final Set<Watcher> toNotifyCreate = Sets.newHashSet(); - toNotifyCreate.addAll(watchers.get(path)); + toNotifyCreate.addAll(getWatchers(path)); final Set<Watcher> toNotifyParent = Sets.newHashSet(); - final String parent = path.substring(0, path.lastIndexOf("/")); - if (!parent.isEmpty()) { - toNotifyParent.addAll(watchers.get(parent)); + final String parent = getParentName(path); + if (!ROOT_PATH.equals(parent)) { + toNotifyParent.addAll(getWatchers(parent)); } final String name; @@ -357,355 +440,247 @@ public class MockZooKeeper extends ZooKeeper { Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path); if (failure.isPresent()) { - unlockIfLocked(); cb.processResult(failure.get().intValue(), path, ctx, null); - } else if (stopped) { - unlockIfLocked(); + } else if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); } else if (tree.containsKey(path)) { - unlockIfLocked(); cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null); - } else if (!parent.isEmpty() && !tree.containsKey(parent)) { - unlockIfLocked(); - toNotifyParent.forEach(watcher -> watcher - .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, - parent))); + } else if (!tree.containsKey(parent)) { + runNotifications(() -> { + toNotifyParent.forEach(watcher -> watcher + .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, + parent))); + }); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); } else { - tree.put(name, MockZNode.of(data, 0, - createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : -1L)); + tree.get(parent).getChildren().add(getNodeName(name)); + tree.put(name, createMockZNode(data, createMode)); watchers.removeAll(name); - unlockIfLocked(); cb.processResult(0, path, ctx, name); - - triggerPersistentWatches(path, parent, EventType.NodeCreated); - - toNotifyCreate.forEach( - watcher -> watcher.process( - new WatchedEvent(EventType.NodeCreated, - KeeperState.SyncConnected, - name))); - toNotifyParent.forEach( - watcher -> watcher.process( - new WatchedEvent(EventType.NodeChildrenChanged, - KeeperState.SyncConnected, - parent))); + runNotifications(() -> { + triggerPersistentWatches(path, parent, EventType.NodeCreated); + + toNotifyCreate.forEach( + watcher -> watcher.process( + new WatchedEvent(EventType.NodeCreated, + KeeperState.SyncConnected, + name))); + toNotifyParent.forEach( + watcher -> watcher.process( + new WatchedEvent(EventType.NodeChildrenChanged, + KeeperState.SyncConnected, + parent))); + }); } } catch (Throwable ex) { log.error("create path : {} error", path, ex); cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null); - } finally { - unlockIfLocked(); } }); + } + + public void runNotifications(Runnable runnable) { + executor.execute(() -> { + if (isStopped()) { + return; + } + runnable.run(); + }); + } + private boolean isStopped() { + return stopped.get(); + } + + private MockZNode createMockZNode(byte[] data, CreateMode createMode) { + return MockZNode.of(data, 0, + createMode != null && createMode.isEphemeral() ? getSessionId() : NOT_EPHEMERAL); } @Override - public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException { - lock(); - try { - maybeThrowProgrammedFailure(Op.GET, path); - MockZNode value = tree.get(path); - if (value == null) { - throw new KeeperException.NoNodeException(path); - } else { - if (watcher != null) { - watchers.put(path, watcher); - } - if (stat != null) { - applyToStat(value, stat); - } - return value.getContent(); + public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { + return runInExecutorReturningValue(() -> internalGetData(path, watcher, stat)); + } + + private byte[] internalGetData(String path, Watcher watcher, Stat stat) throws KeeperException { + maybeThrowProgrammedFailure(Op.GET, path); + MockZNode value = tree.get(path); + if (value == null) { + throw new KeeperException.NoNodeException(path); + } else { + if (watcher != null) { + watchers.put(path, new NodeWatcher(watcher, getSessionId())); } - } finally { - unlockIfLocked(); + if (stat != null) { + value.applyToStat(stat); + } + return value.getContent(); } } @Override public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) { - executor.execute(() -> { - try { - checkReadOpDelay(); - Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path); - if (failure.isPresent()) { - cb.processResult(failure.get().intValue(), path, ctx, null, null); - return; - } else if (stopped) { - cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null); - return; - } - - MockZNode value; - lock(); - try { - value = tree.get(path); - } finally { - unlockIfLocked(); - } - - if (value == null) { - cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null); - } else { - cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value)); - } - } catch (Throwable ex) { - log.error("get data : {} error", path, ex); - cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null); - } - }); + getData(path, null, cb, ctx); } @Override public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) { - executor.execute(() -> { + if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null); + return; + } + runInExecutorAsync(() -> { checkReadOpDelay(); try { - lock(); Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path); if (failure.isPresent()) { - unlockIfLocked(); cb.processResult(failure.get().intValue(), path, ctx, null, null); return; - } else if (stopped) { - unlockIfLocked(); + } else if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null); return; } MockZNode value = tree.get(path); if (value == null) { - unlockIfLocked(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null); } else { if (watcher != null) { - watchers.put(path, watcher); + watchers.put(path, new NodeWatcher(watcher, getSessionId())); } - - Stat stat = createStatForZNode(value); - unlockIfLocked(); + Stat stat = value.getStat(); cb.processResult(0, path, ctx, value.getContent(), stat); } } catch (Throwable ex) { log.error("get data : {} error", path, ex); cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null); - } finally { - unlockIfLocked(); } }); } @Override public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) { - executor.execute(() -> { - List<String> children = Lists.newArrayList(); + if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); + return; + } + runInExecutorAsync(() -> { try { - lock(); Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path); if (failure.isPresent()) { - unlockIfLocked(); cb.processResult(failure.get().intValue(), path, ctx, null); return; - } else if (stopped) { - unlockIfLocked(); + } else if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); return; } if (!tree.containsKey(path)) { - unlockIfLocked(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); return; } - for (String item : tree.tailMap(path).keySet()) { - if (!item.startsWith(path)) { - break; - } else { - if (path.length() >= item.length()) { - continue; - } - - String child = item.substring(path.length() + 1); - if (item.charAt(path.length()) == '/' && !child.contains("/")) { - children.add(child); - } - } - } - + List<String> children = findFirstLevelChildren(path); if (watcher != null) { - watchers.put(path, watcher); + watchers.put(path, new NodeWatcher(watcher, getSessionId())); } cb.processResult(0, path, ctx, children); } catch (Throwable ex) { log.error("get children : {} error", path, ex); cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null); - } finally { - unlockIfLocked(); } - }); } @Override - public List<String> getChildren(String path, Watcher watcher) throws KeeperException { - lock(); - try { - maybeThrowProgrammedFailure(Op.GET_CHILDREN, path); - - if (!tree.containsKey(path)) { - throw new KeeperException.NoNodeException(); - } - - String firstKey = path.equals("/") ? path : path + "/"; - String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/' + public List<String> getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException { + return runInExecutorReturningValue(() -> internalGetChildren(path, watcher)); + } - Set<String> children = new TreeSet<>(); - tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> { - String relativePath = key.replace(firstKey, ""); + private List<String> internalGetChildren(String path, Watcher watcher) throws KeeperException { + maybeThrowProgrammedFailure(Op.GET_CHILDREN, path); - // Only return first-level children - String child = relativePath.split("/", 2)[0]; - children.add(child); - }); - - if (watcher != null) { - watchers.put(path, watcher); - } + if (!tree.containsKey(path)) { + throw new KeeperException.NoNodeException(path); + } - return new ArrayList<>(children); - } finally { - unlockIfLocked(); + if (watcher != null) { + watchers.put(path, new NodeWatcher(watcher, getSessionId())); } + + return findFirstLevelChildren(path); } @Override public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException { - lock(); - try { - maybeThrowProgrammedFailure(Op.GET_CHILDREN, path); - - if (stopped) { - throw new KeeperException.ConnectionLossException(); - } else if (!tree.containsKey(path)) { - throw new KeeperException.NoNodeException(); - } - - String firstKey = path.equals("/") ? path : path + "/"; - String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/' - - Set<String> children = new TreeSet<>(); - tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> { - String relativePath = key.replace(firstKey, ""); - - // Only return first-level children - String child = relativePath.split("/", 2)[0]; - children.add(child); - }); - - return new ArrayList<>(children); - } finally { - unlockIfLocked(); - } + return getChildren(path, null); } @Override public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) { - executor.execute(() -> { - Set<String> children = new TreeSet<>(); + if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null); + return; + } + runInExecutorAsync(() -> { try { - lock(); + MockZNode mockZNode = tree.get(path); + Stat stat = mockZNode != null ? mockZNode.getStat() : null; Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path); if (failure.isPresent()) { - unlockIfLocked(); cb.processResult(failure.get().intValue(), path, ctx, null, null); return; - } else if (stopped) { - unlockIfLocked(); + } else if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null); return; - } else if (!tree.containsKey(path)) { - unlockIfLocked(); + } else if (mockZNode == null) { cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null); return; } - String firstKey = path.equals("/") ? path : path + "/"; - String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/' - - tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> { - String relativePath = key.replace(firstKey, ""); - - // Only return first-level children - String child = relativePath.split("/", 2)[0]; - children.add(child); - }); - cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat()); + List<String> children = findFirstLevelChildren(path); + cb.processResult(0, path, ctx, children, stat); } catch (Throwable ex) { log.error("get children : {} error", path, ex); cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null); - } finally { - unlockIfLocked(); } }); + } + private List<String> findFirstLevelChildren(String path) { + return new ArrayList<>(tree.get(path).getChildren()); + } + + private boolean hasChildren(String path) { + return !tree.get(path).getChildren().isEmpty(); } @Override public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { - lock(); - try { - maybeThrowProgrammedFailure(Op.EXISTS, path); + return runInExecutorReturningValue(() -> internalGetStat(path, null)); + } - if (stopped) { - throw new KeeperException.ConnectionLossException(); - } + private Stat internalGetStat(String path, Watcher watcher) throws KeeperException { + maybeThrowProgrammedFailure(Op.EXISTS, path); - if (tree.containsKey(path)) { - return createStatForZNode(tree.get(path)); - } else { - return null; - } - } finally { - unlockIfLocked(); + if (isStopped()) { + throw new KeeperException.ConnectionLossException(); } - } - private static Stat createStatForZNode(MockZNode zNode) { - return applyToStat(zNode, new Stat()); - } + if (watcher != null) { + watchers.put(path, new NodeWatcher(watcher, getSessionId())); + } - private static Stat applyToStat(MockZNode zNode, Stat stat) { - stat.setVersion(zNode.getVersion()); - if (zNode.getEphemeralOwner() != -1L) { - stat.setEphemeralOwner(zNode.getEphemeralOwner()); + if (tree.containsKey(path)) { + return tree.get(path).getStat(); + } else { + return null; } - return stat; } @Override public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { - lock(); - try { - maybeThrowProgrammedFailure(Op.EXISTS, path); - - if (stopped) { - throw new KeeperException.ConnectionLossException(); - } - - if (watcher != null) { - watchers.put(path, watcher); - } - - if (tree.containsKey(path)) { - return createStatForZNode(tree.get(path)); - } else { - return null; - } - } finally { - unlockIfLocked(); - } + return runInExecutorReturningValue(() -> internalGetStat(path, watcher)); } @Override @@ -715,160 +690,149 @@ public class MockZooKeeper extends ZooKeeper { @Override public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) { - executor.execute(() -> { + if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); + return; + } + runInExecutorAsync(() -> { try { - lock(); Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path); if (failure.isPresent()) { - unlockIfLocked(); cb.processResult(failure.get().intValue(), path, ctx, null); return; - } else if (stopped) { - unlockIfLocked(); + } else if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); return; } if (watcher != null) { - watchers.put(path, watcher); + watchers.put(path, new NodeWatcher(watcher, getSessionId())); } - if (tree.containsKey(path)) { - unlockIfLocked(); - cb.processResult(0, path, ctx, new Stat()); + MockZNode mockZNode = tree.get(path); + if (mockZNode != null) { + Stat stat = mockZNode.getStat(); + cb.processResult(0, path, ctx, stat); } else { - unlockIfLocked(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); } } catch (Throwable ex) { log.error("exist : {} error", path, ex); cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null); - } finally { - unlockIfLocked(); } }); } @Override public void sync(String path, VoidCallback cb, Object ctx) { - executor.execute(() -> { + if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx); + return; + } + runInExecutorAsync(() -> { Optional<KeeperException.Code> failure = programmedFailure(Op.SYNC, path); if (failure.isPresent()) { cb.processResult(failure.get().intValue(), path, ctx); return; - } else if (stopped) { + } else if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx); return; } - cb.processResult(0, path, ctx); }); - } @Override public Stat setData(final String path, byte[] data, int version) throws KeeperException, InterruptedException { - final Set<Watcher> toNotify = Sets.newHashSet(); - MockZNode newZNode; - - lock(); - try { - maybeThrowProgrammedFailure(Op.SET, path); - - if (stopped) { - throw new KeeperException.ConnectionLossException(); - } + return runInExecutorReturningValue(() -> internalSetData(path, data, version)); + } - if (!tree.containsKey(path)) { - throw new KeeperException.NoNodeException(); - } + private Stat internalSetData(String path, byte[] data, int version) throws KeeperException { + final Set<Watcher> toNotify = Sets.newHashSet(); + maybeThrowProgrammedFailure(Op.SET, path); - MockZNode mockZNode = tree.get(path); - int currentVersion = mockZNode.getVersion(); + if (isStopped()) { + throw new KeeperException.ConnectionLossException(); + } - // Check version - if (version != -1 && version != currentVersion) { - throw new KeeperException.BadVersionException(path); - } + if (!tree.containsKey(path)) { + throw new KeeperException.NoNodeException(path); + } - log.debug("[{}] Updating -- current version: {}", path, currentVersion); - newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner()); - tree.put(path, newZNode); + MockZNode mockZNode = tree.get(path); + int currentVersion = mockZNode.getVersion(); - toNotify.addAll(watchers.get(path)); - watchers.removeAll(path); - } finally { - unlockIfLocked(); + // Check version + if (version != -1 && version != currentVersion) { + throw new KeeperException.BadVersionException(path); } - executor.execute(() -> { + log.debug("[{}] Updating -- current version: {}", path, currentVersion); + mockZNode.updateData(data); + Stat stat = mockZNode.getStat(); + toNotify.addAll(getWatchers(path)); + watchers.removeAll(path); + + runNotifications(() -> { triggerPersistentWatches(path, null, EventType.NodeDataChanged); toNotify.forEach(watcher -> watcher .process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path))); }); - return createStatForZNode(newZNode); + return stat; } @Override public void setData(final String path, final byte[] data, int version, final StatCallback cb, final Object ctx) { - if (stopped) { + if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); return; } - - executor.execute(() -> { + runInExecutorAsync(() -> { try { final Set<Watcher> toNotify = Sets.newHashSet(); Stat stat; - lock(); - try { - Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path); - if (failure.isPresent()) { - unlockIfLocked(); - cb.processResult(failure.get().intValue(), path, ctx, null); - return; - } else if (stopped) { - unlockIfLocked(); - cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); - return; - } - - if (!tree.containsKey(path)) { - unlockIfLocked(); - cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); - return; - } + Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path); + if (failure.isPresent()) { + cb.processResult(failure.get().intValue(), path, ctx, null); + return; + } else if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); + return; + } - MockZNode mockZNode = tree.get(path); - int currentVersion = mockZNode.getVersion(); + if (!tree.containsKey(path)) { + cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); + return; + } - // Check version - if (version != -1 && version != currentVersion) { - log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version); - unlockIfLocked(); - cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null); - return; - } + MockZNode mockZNode = tree.get(path); + int currentVersion = mockZNode.getVersion(); - log.debug("[{}] Updating -- current version: {}", path, currentVersion); - MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner()); - tree.put(path, newZNode); - stat = createStatForZNode(newZNode); - } finally { - unlockIfLocked(); + // Check version + if (version != -1 && version != currentVersion) { + log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version); + Stat currentStat = mockZNode.getStat(); + cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, currentStat); + return; } + + log.debug("[{}] Updating -- current version: {}", path, currentVersion); + mockZNode.updateData(data); + stat = mockZNode.getStat(); cb.processResult(0, path, ctx, stat); - toNotify.addAll(watchers.get(path)); + toNotify.addAll(getWatchers(path)); watchers.removeAll(path); - for (Watcher watcher : toNotify) { - watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)); - } + runNotifications(() -> { + triggerPersistentWatches(path, null, EventType.NodeDataChanged); - triggerPersistentWatches(path, null, EventType.NodeDataChanged); + for (Watcher watcher : toNotify) { + watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)); + } + }); } catch (Throwable ex) { log.error("Update data : {} error", path, ex); cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null); @@ -878,50 +842,49 @@ public class MockZooKeeper extends ZooKeeper { @Override public void delete(final String path, int version) throws InterruptedException, KeeperException { + runInExecutorReturningValue(() -> { + internalDelete(path, version); + return null; + }); + } + + private void internalDelete(String path, int version) throws KeeperException { maybeThrowProgrammedFailure(Op.DELETE, path); final Set<Watcher> toNotifyDelete; final Set<Watcher> toNotifyParent; final String parent; - lock(); - try { - if (stopped) { - throw new KeeperException.ConnectionLossException(); - } else if (!tree.containsKey(path)) { - throw new KeeperException.NoNodeException(path); - } else if (hasChildren(path)) { - throw new KeeperException.NotEmptyException(path); - } + if (isStopped()) { + throw new KeeperException.ConnectionLossException(); + } else if (!tree.containsKey(path)) { + throw new KeeperException.NoNodeException(path); + } else if (hasChildren(path)) { + throw new KeeperException.NotEmptyException(path); + } - if (version != -1) { - int currentVersion = tree.get(path).getVersion(); - if (version != currentVersion) { - throw new KeeperException.BadVersionException(path); - } + if (version != -1) { + int currentVersion = tree.get(path).getVersion(); + if (version != currentVersion) { + throw new KeeperException.BadVersionException(path); } + } - tree.remove(path); - - toNotifyDelete = Sets.newHashSet(); - toNotifyDelete.addAll(watchers.get(path)); + parent = getParentName(path); + tree.remove(path); + tree.get(parent).getChildren().remove(getNodeName(path)); - toNotifyParent = Sets.newHashSet(); - parent = path.substring(0, path.lastIndexOf("/")); - if (!parent.isEmpty()) { - toNotifyParent.addAll(watchers.get(parent)); - } + toNotifyDelete = Sets.newHashSet(); + toNotifyDelete.addAll(getWatchers(path)); - watchers.removeAll(path); - } finally { - unlockIfLocked(); + toNotifyParent = Sets.newHashSet(); + if (!ROOT_PATH.equals(parent)) { + toNotifyParent.addAll(getWatchers(parent)); } - executor.execute(() -> { - if (stopped) { - return; - } + watchers.removeAll(path); + runNotifications(() -> { for (Watcher watcher1 : toNotifyDelete) { watcher1.process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)); } @@ -935,179 +898,209 @@ public class MockZooKeeper extends ZooKeeper { @Override public void delete(final String path, int version, final VoidCallback cb, final Object ctx) { - Runnable r = () -> { + if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx); + return; + } + runInExecutorAsync(() -> { try { - lock(); final Set<Watcher> toNotifyDelete = Sets.newHashSet(); - toNotifyDelete.addAll(watchers.get(path)); + toNotifyDelete.addAll(getWatchers(path)); final Set<Watcher> toNotifyParent = Sets.newHashSet(); - final String parent = path.substring(0, path.lastIndexOf("/")); - if (!parent.isEmpty()) { - toNotifyParent.addAll(watchers.get(parent)); + final String parent = getParentName(path); + if (!ROOT_PATH.equals(parent)) { + toNotifyParent.addAll(getWatchers(parent)); } watchers.removeAll(path); Optional<KeeperException.Code> failure = programmedFailure(Op.DELETE, path); if (failure.isPresent()) { - unlockIfLocked(); cb.processResult(failure.get().intValue(), path, ctx); - } else if (stopped) { - unlockIfLocked(); + } else if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx); } else if (!tree.containsKey(path)) { - unlockIfLocked(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx); } else if (hasChildren(path)) { - unlockIfLocked(); cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx); } else { if (version != -1) { int currentVersion = tree.get(path).getVersion(); if (version != currentVersion) { - unlockIfLocked(); cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx); return; } } tree.remove(path); - - unlockIfLocked(); + tree.get(parent).getChildren().remove(getNodeName(path)); cb.processResult(0, path, ctx); - toNotifyDelete.forEach(watcher -> watcher - .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path))); - toNotifyParent.forEach(watcher -> watcher - .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, - parent))); - triggerPersistentWatches(path, parent, EventType.NodeDeleted); + runNotifications(() -> { + triggerPersistentWatches(path, parent, EventType.NodeDeleted); + toNotifyDelete.forEach(watcher -> watcher + .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path))); + toNotifyParent.forEach(watcher -> watcher + .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, + parent))); + }); } } catch (Throwable ex) { log.error("delete path : {} error", path, ex); cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx); - } finally { - unlockIfLocked(); } - }; - - try { - executor.execute(r); - } catch (RejectedExecutionException ree) { - cb.processResult(KeeperException.Code.SESSIONEXPIRED.intValue(), path, ctx); - } - + }); } @Override public void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx) { - try { - List<OpResult> res = multi(ops); - cb.processResult(KeeperException.Code.OK.intValue(), null, ctx, res); - } catch (Exception e) { - cb.processResult(KeeperException.Code.APIERROR.intValue(), null, ctx, null); + if (isStopped()) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), null, ctx, null); + return; } + runInExecutorAsync(() -> { + try { + List<OpResult> res = multi(ops); + cb.processResult(KeeperException.Code.OK.intValue(), null, ctx, res); + } catch (Exception e) { + cb.processResult(KeeperException.Code.APIERROR.intValue(), null, ctx, null); + } + }); } @Override public List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws InterruptedException, KeeperException { + return runInExecutorReturningValue(() -> internalMulti(ops)); + } + + private List<OpResult> internalMulti(Iterable<org.apache.zookeeper.Op> ops) { List<OpResult> res = new ArrayList<>(); - try { - for (org.apache.zookeeper.Op op : ops) { - switch (op.getType()) { - case ZooDefs.OpCode.create -> { + for (org.apache.zookeeper.Op op : ops) { + switch (op.getType()) { + case ZooDefs.OpCode.create -> { + handleOperation("create", op, () -> { org.apache.zookeeper.Op.Create opc = ((org.apache.zookeeper.Op.Create) op); CreateMode cm = CreateMode.fromFlag(opc.flags); - String path = this.create(op.getPath(), opc.data, null, cm); + String path = create(op.getPath(), opc.data, null, cm); res.add(new OpResult.CreateResult(path)); - } - case ZooDefs.OpCode.delete -> { - this.delete(op.getPath(), (int) FieldUtils.readField(op, "version", true)); + }, res); + } + case ZooDefs.OpCode.delete -> { + handleOperation("delete", op, () -> { + DeleteRequest deleteRequest = (DeleteRequest) op.toRequestRecord(); + delete(op.getPath(), deleteRequest.getVersion()); res.add(new OpResult.DeleteResult()); - } - case ZooDefs.OpCode.setData -> { - Stat stat = this.setData( - op.getPath(), - (byte[]) FieldUtils.readField(op, "data", true), - (int) FieldUtils.readField(op, "version", true)); + }, res); + } + case ZooDefs.OpCode.setData -> { + handleOperation("setData", op, () -> { + SetDataRequest setDataRequest = (SetDataRequest) op.toRequestRecord(); + Stat stat = setData(op.getPath(), setDataRequest.getData(), setDataRequest.getVersion()); res.add(new OpResult.SetDataResult(stat)); - } - case ZooDefs.OpCode.getChildren -> { - try { - List<String> children = this.getChildren(op.getPath(), null); - res.add(new OpResult.GetChildrenResult(children)); - } catch (KeeperException e) { - res.add(new OpResult.ErrorResult(e.code().intValue())); - } - } - case ZooDefs.OpCode.getData -> { - Stat stat = new Stat(); - try { - byte[] payload = this.getData(op.getPath(), null, stat); - res.add(new OpResult.GetDataResult(payload, stat)); - } catch (KeeperException e) { - res.add(new OpResult.ErrorResult(e.code().intValue())); - } - } + }, res); + } + case ZooDefs.OpCode.getChildren -> { + handleOperation("getChildren", op, () -> { + List<String> children = getChildren(op.getPath(), null); + res.add(new OpResult.GetChildrenResult(children)); + }, res); + } + case ZooDefs.OpCode.getData -> { + Stat stat = new Stat(); + handleOperation("getData", op, () -> { + byte[] payload = getData(op.getPath(), null, stat); + res.add(new OpResult.GetDataResult(payload, stat)); + }, res); + } + default -> { + log.error("Unsupported operation for path {} type {} kind {} request {}", op.getPath(), + op.getType(), op.getKind(), op.toRequestRecord()); + res.add(new OpResult.ErrorResult(KeeperException.Code.APIERROR.intValue())); } } - } catch (KeeperException e) { - res.add(new OpResult.ErrorResult(e.code().intValue())); - int total = Iterables.size(ops); - for (int i = res.size(); i < total; i++) { + } + return res; + } + + interface ZkOpHandler { + void handle() throws KeeperException, InterruptedException; + } + + private void handleOperation(String opName, org.apache.zookeeper.Op op, ZkOpHandler handler, List<OpResult> res) { + try { + handler.handle(); + } catch (Exception e) { + if (e instanceof KeeperException keeperException) { + res.add(new OpResult.ErrorResult(keeperException.code().intValue())); + } else { + log.error("Error handling {} operation for path {} type {} kind {} request {}", opName, op.getPath(), + op.getType(), op.getKind(), op.toRequestRecord(), e); res.add(new OpResult.ErrorResult(KeeperException.Code.RUNTIMEINCONSISTENCY.intValue())); } - } catch (IllegalAccessException e) { - throw new IllegalStateException(e); } - return res; } @Override - public synchronized void addWatch(String basePath, Watcher watcher, AddWatchMode mode) { - persistentWatchers.add(new PersistentWatcher(basePath, watcher, mode)); + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) { + runInExecutorSync(() -> { + persistentWatchers.add(new PersistentWatcher(basePath, watcher, mode, getSessionId())); + }); } @Override public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) { - if (stopped) { + if (isStopped()) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), basePath, ctx); return; } - - executor.execute(() -> { - synchronized (MockZooKeeper.this) { - persistentWatchers.add(new PersistentWatcher(basePath, watcher, mode)); - } - + runInExecutorAsync(() -> { + addWatch(basePath, watcher, mode); cb.processResult(KeeperException.Code.OK.intValue(), basePath, ctx); }); } + public synchronized void increaseRefCount() { + referenceCount++; + } + + public synchronized MockZooKeeper registerCloseable(AutoCloseable closeable) { + closeables.add(closeable); + return this; + } + @Override - public void close() throws InterruptedException { - shutdown(); + public synchronized void close() throws InterruptedException { + if (--referenceCount <= 0) { + shutdown(); + closeables.forEach(c -> { + try { + c.close(); + } catch (Exception e) { + log.error("Error closing closeable", e); + } + }); + closeables.clear(); + } } public void shutdown() throws InterruptedException { - lock(); - try { - stopped = true; - tree.clear(); - watchers.clear(); + if (stopped.compareAndSet(false, true)) { + Future<?> shutdownTask = executor.submit(() -> { + tree.clear(); + watchers.clear(); + persistentWatchers.clear(); + }); try { - executor.shutdownNow(); - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - log.error("MockZooKeeper shutdown had error", ex); + shutdownTask.get(); + } catch (ExecutionException e) { + log.error("Error shutting down", e); } - } finally { - unlockIfLocked(); + MoreExecutors.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); } } Optional<KeeperException.Code> programmedFailure(Op op, String path) { - KeeperException.Code error = this.alwaysFail.get(); + KeeperException.Code error = alwaysFail.get(); if (error != KeeperException.Code.OK) { return Optional.of(error); } @@ -1144,26 +1137,17 @@ public class MockZooKeeper extends ZooKeeper { } public void setAlwaysFail(KeeperException.Code rc) { - this.alwaysFail.set(rc); + alwaysFail.set(rc); } public void unsetAlwaysFail() { - this.alwaysFail.set(KeeperException.Code.OK); + alwaysFail.set(KeeperException.Code.OK); } public void setSessionId(long id) { sessionId = id; } - @Override - public long getSessionId() { - return sessionId; - } - - private boolean hasChildren(String path) { - return !tree.subMap(path + '/', path + '0').isEmpty(); - } - @Override public String toString() { return "MockZookeeper"; @@ -1182,11 +1166,11 @@ public class MockZooKeeper extends ZooKeeper { private void triggerPersistentWatches(String path, String parent, EventType eventType) { persistentWatchers.forEach(w -> { if (w.mode == AddWatchMode.PERSISTENT_RECURSIVE) { - if (path.startsWith(w.getPath())) { + if (path.startsWith(w.path())) { w.watcher.process(new WatchedEvent(eventType, KeeperState.SyncConnected, path)); } } else if (w.mode == AddWatchMode.PERSISTENT) { - if (w.getPath().equals(path)) { + if (w.path().equals(path)) { w.watcher.process(new WatchedEvent(eventType, KeeperState.SyncConnected, path)); } @@ -1199,5 +1183,26 @@ public class MockZooKeeper extends ZooKeeper { }); } + public void deleteEphemeralNodes(long sessionId) { + if (sessionId != NOT_EPHEMERAL) { + runInExecutorSync(() -> { + tree.values().removeIf(zNode -> zNode.getEphemeralOwner() == sessionId); + }); + } + } + + + public void deleteWatchers(long sessionId) { + runInExecutorSync(() -> { + // remove all persistent watchers for the session + persistentWatchers.removeIf(w -> w.sessionId == sessionId); + // remove all watchers for the session + List<Map.Entry<String, NodeWatcher>> watchersForSession = + watchers.entries().stream().filter(e -> e.getValue().sessionId == sessionId).toList(); + watchersForSession + .forEach(e -> watchers.remove(e.getKey(), e.getValue())); + }); + } + private static final Logger log = LoggerFactory.getLogger(MockZooKeeper.class); } diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java index a286a75aa91..c812423b728 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java @@ -40,7 +40,7 @@ public class MockZooKeeperSession extends ZooKeeper { private MockZooKeeper mockZooKeeper; - private long sessionId = 0L; + private long sessionId = 1L; private static final Objenesis objenesis = new ObjenesisStd(); @@ -59,6 +59,9 @@ public class MockZooKeeperSession extends ZooKeeper { mockZooKeeperSession.mockZooKeeper = mockZooKeeper; mockZooKeeperSession.sessionId = sessionIdGenerator.getAndIncrement(); mockZooKeeperSession.closeMockZooKeeperOnClose = closeMockZooKeeperOnClose; + if (closeMockZooKeeperOnClose) { + mockZooKeeper.increaseRefCount(); + } return mockZooKeeperSession; } @@ -81,17 +84,22 @@ public class MockZooKeeperSession extends ZooKeeper { @Override public void register(Watcher watcher) { - mockZooKeeper.register(watcher); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.register(watcher); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { try { - mockZooKeeper.overrideEpheralOwner(getSessionId()); + mockZooKeeper.overrideSessionId(getSessionId()); return mockZooKeeper.create(path, data, acl, createMode); } finally { - mockZooKeeper.removeEpheralOwnerOverride(); + mockZooKeeper.removeSessionIdOverride(); } } @@ -99,134 +107,257 @@ public class MockZooKeeperSession extends ZooKeeper { public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode, final AsyncCallback.StringCallback cb, final Object ctx) { try { - mockZooKeeper.overrideEpheralOwner(getSessionId()); + mockZooKeeper.overrideSessionId(getSessionId()); mockZooKeeper.create(path, data, acl, createMode, cb, ctx); } finally { - mockZooKeeper.removeEpheralOwnerOverride(); + mockZooKeeper.removeSessionIdOverride(); } } @Override - public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException { - return mockZooKeeper.getData(path, watcher, stat); + public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { + try { + mockZooKeeper.overrideSessionId(getSessionId()); + return mockZooKeeper.getData(path, watcher, stat); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) { - mockZooKeeper.getData(path, watch, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.getData(path, watch, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) { - mockZooKeeper.getData(path, watcher, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.getData(path, watcher, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) { - mockZooKeeper.getChildren(path, watcher, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.getChildren(path, watcher, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override - public List<String> getChildren(String path, Watcher watcher) throws KeeperException { - return mockZooKeeper.getChildren(path, watcher); + public List<String> getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException { + try { + mockZooKeeper.overrideSessionId(getSessionId()); + return mockZooKeeper.getChildren(path, watcher); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException { - return mockZooKeeper.getChildren(path, watch); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + return mockZooKeeper.getChildren(path, watch); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) { - mockZooKeeper.getChildren(path, watcher, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.getChildren(path, watcher, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { - return mockZooKeeper.exists(path, watch); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + return mockZooKeeper.exists(path, watch); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { - return mockZooKeeper.exists(path, watcher); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + return mockZooKeeper.exists(path, watcher); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void exists(String path, boolean watch, StatCallback cb, Object ctx) { - mockZooKeeper.exists(path, watch, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.exists(path, watch, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) { - mockZooKeeper.exists(path, watcher, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.exists(path, watcher, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void sync(String path, VoidCallback cb, Object ctx) { - mockZooKeeper.sync(path, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.sync(path, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public Stat setData(final String path, byte[] data, int version) throws KeeperException, InterruptedException { - return mockZooKeeper.setData(path, data, version); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + return mockZooKeeper.setData(path, data, version); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void setData(final String path, final byte[] data, int version, final StatCallback cb, final Object ctx) { - mockZooKeeper.setData(path, data, version, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.setData(path, data, version, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void delete(final String path, int version) throws InterruptedException, KeeperException { - mockZooKeeper.delete(path, version); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.delete(path, version); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void delete(final String path, int version, final VoidCallback cb, final Object ctx) { - mockZooKeeper.delete(path, version, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.delete(path, version, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx) { - mockZooKeeper.multi(ops, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.multi(ops, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws InterruptedException, KeeperException { - return mockZooKeeper.multi(ops); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + return mockZooKeeper.multi(ops); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) { - mockZooKeeper.addWatch(basePath, watcher, mode, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.addWatch(basePath, watcher, mode, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) throws KeeperException, InterruptedException { - mockZooKeeper.addWatch(basePath, watcher, mode); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.addWatch(basePath, watcher, mode); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void addWatch(String basePath, AddWatchMode mode) throws KeeperException, InterruptedException { - mockZooKeeper.addWatch(basePath, mode); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.addWatch(basePath, mode); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, Object ctx) { - mockZooKeeper.addWatch(basePath, mode, cb, ctx); + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.addWatch(basePath, mode, cb, ctx); + } finally { + mockZooKeeper.removeSessionIdOverride(); + } } @Override public void close() throws InterruptedException { - if (closeMockZooKeeperOnClose) { - mockZooKeeper.close(); - } + internalClose(false); } public void shutdown() throws InterruptedException { - if (closeMockZooKeeperOnClose) { - mockZooKeeper.shutdown(); + internalClose(true); + } + + private void internalClose(boolean shutdown) throws InterruptedException { + try { + mockZooKeeper.overrideSessionId(getSessionId()); + mockZooKeeper.deleteEphemeralNodes(getSessionId()); + mockZooKeeper.deleteWatchers(getSessionId()); + if (closeMockZooKeeperOnClose) { + if (shutdown) { + mockZooKeeper.shutdown(); + } else { + mockZooKeeper.close(); + } + } + } finally { + mockZooKeeper.removeSessionIdOverride(); } }
