This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit de4f06b8f68efed1d11157f498115e7c47cf7c29 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Fri Apr 11 12:15:00 2025 +0300 [improve][test] Use configured session timeout for MockZooKeeper and TestZKServer in PulsarTestContext (#24171) (cherry picked from commit 4c85b4754c7ed4bdc365cf1ec08c6804df5b985e) --- .../broker/testcontext/PulsarTestContext.java | 160 +++++++++++++++------ .../MockZooKeeperMetadataStoreProvider.java | 1 + .../java/org/apache/zookeeper/MockZooKeeper.java | 8 +- .../org/apache/zookeeper/MockZooKeeperSession.java | 12 +- 4 files changed, 134 insertions(+), 47 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index feb0be3d947..66d35d18392 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -218,6 +218,18 @@ public class PulsarTestContext implements AutoCloseable { getPulsarService()); } + private enum WithMockZooKeeperOrTestZKServer { + MOCKZOOKEEPER, MOCKZOOKEEPER_SEPARATE_GLOBAL, TEST_ZK_SERVER, TEST_ZK_SERVER_SEPARATE_GLOBAL; + + boolean isMockZooKeeper() { + return this == MOCKZOOKEEPER || this == MOCKZOOKEEPER_SEPARATE_GLOBAL; + } + + boolean isTestZKServer() { + return this == TEST_ZK_SERVER || this == TEST_ZK_SERVER_SEPARATE_GLOBAL; + } + } + /** * A builder for a PulsarTestContext. * @@ -233,6 +245,7 @@ public class PulsarTestContext implements AutoCloseable { protected boolean configOverrideCalled = false; protected Function<BrokerService, BrokerService> brokerServiceCustomizer = Function.identity(); + protected WithMockZooKeeperOrTestZKServer withMockZooKeeperOrTestZKServer; /** * Initialize the ServiceConfiguration with default values. @@ -379,11 +392,13 @@ public class PulsarTestContext implements AutoCloseable { public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext otherContext) { bookKeeperClient(otherContext.getBookKeeperClient()); if (otherContext.getMockZooKeeper() != null) { + withMockZooKeeperOrTestZKServer = null; mockZooKeeper(otherContext.getMockZooKeeper()); if (otherContext.getMockZooKeeperGlobal() != null) { mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal()); } } else if (otherContext.getTestZKServer() != null) { + withMockZooKeeperOrTestZKServer = null; testZKServer(otherContext.getTestZKServer()); if (otherContext.getTestZKServerGlobal() != null) { testZKServerGlobal(otherContext.getTestZKServerGlobal()); @@ -435,31 +450,14 @@ public class PulsarTestContext implements AutoCloseable { * @return the builder */ public Builder withMockZookeeper(boolean useSeparateGlobalZk) { - try { - mockZooKeeper(createMockZooKeeper()); - if (useSeparateGlobalZk) { - mockZooKeeperGlobal(createMockZooKeeper()); - } - } catch (Exception e) { - throw new RuntimeException(e); + if (useSeparateGlobalZk) { + withMockZooKeeperOrTestZKServer = WithMockZooKeeperOrTestZKServer.MOCKZOOKEEPER_SEPARATE_GLOBAL; + } else { + withMockZooKeeperOrTestZKServer = WithMockZooKeeperOrTestZKServer.MOCKZOOKEEPER; } return this; } - private MockZooKeeper createMockZooKeeper() throws Exception { - MockZooKeeper zk = MockZooKeeper.newInstance(); - initializeZookeeper(zk); - registerCloseable(zk::shutdown); - return zk; - } - - private static void initializeZookeeper(ZooKeeper zk) throws KeeperException, InterruptedException { - ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000, - "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } /** * Configure this PulsarTestContext to use a test ZooKeeper instance which is @@ -478,27 +476,14 @@ public class PulsarTestContext implements AutoCloseable { * @return the builder */ public Builder withTestZookeeper(boolean useSeparateGlobalZk) { - try { - testZKServer(createTestZookeeper()); - if (useSeparateGlobalZk) { - testZKServerGlobal(createTestZookeeper()); - } - } catch (Exception e) { - throw new RuntimeException(e); + if (useSeparateGlobalZk) { + withMockZooKeeperOrTestZKServer = WithMockZooKeeperOrTestZKServer.TEST_ZK_SERVER_SEPARATE_GLOBAL; + } else { + withMockZooKeeperOrTestZKServer = WithMockZooKeeperOrTestZKServer.TEST_ZK_SERVER; } return this; } - private TestZKServer createTestZookeeper() throws Exception { - TestZKServer testZKServer = new TestZKServer(); - try (ZooKeeper zkc = new ZooKeeper(testZKServer.getConnectionString(), 5000, event -> { - })) { - initializeZookeeper(zkc); - } - registerCloseable(testZKServer); - return testZKServer; - } - /** * Applicable only when PulsarTestContext is not startable. This will configure mocks * for PulsarTestResources and related classes. @@ -586,6 +571,7 @@ public class PulsarTestContext implements AutoCloseable { if (configOverrideCustomizer != null) { configOverrideCustomizer.accept(super.config); } + createWithMockZooKeeperOrTestZKServerInstances(); if (super.brokerInterceptor != null) { super.config.setDisableBrokerInterceptors(false); } @@ -605,6 +591,73 @@ public class PulsarTestContext implements AutoCloseable { return super.build(); } + void createWithMockZooKeeperOrTestZKServerInstances() { + if (withMockZooKeeperOrTestZKServer == null) { + return; + } + int sessionTimeout = (int) super.config.getMetadataStoreSessionTimeoutMillis(); + try { + if (withMockZooKeeperOrTestZKServer.isMockZooKeeper()) { + if (super.mockZooKeeper == null) { + mockZooKeeper(createMockZooKeeper(sessionTimeout)); + } else { + log.warn("Skipping creating mockZooKeeper, already set"); + } + if (withMockZooKeeperOrTestZKServer + == WithMockZooKeeperOrTestZKServer.MOCKZOOKEEPER_SEPARATE_GLOBAL) { + if (super.mockZooKeeperGlobal == null) { + mockZooKeeperGlobal(createMockZooKeeper(sessionTimeout)); + } else { + log.warn("Skipping creating mockZooKeeperGlobal, already set"); + } + } + } else if (withMockZooKeeperOrTestZKServer.isTestZKServer()) { + if (super.testZKServer == null) { + testZKServer(createTestZookeeper(sessionTimeout)); + } else { + log.warn("Skipping creating testZKServer, already set"); + } + if (withMockZooKeeperOrTestZKServer + == WithMockZooKeeperOrTestZKServer.TEST_ZK_SERVER_SEPARATE_GLOBAL) { + if (super.testZKServerGlobal == null) { + testZKServerGlobal(createTestZookeeper(sessionTimeout)); + } else { + log.warn("Skipping creating testZKServerGlobal, already set"); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private MockZooKeeper createMockZooKeeper(int sessionTimeout) throws Exception { + MockZooKeeper zk = MockZooKeeper.newInstance(); + zk.setSessionTimeout(sessionTimeout); + initializeZookeeper(zk); + registerCloseable(zk::shutdown); + return zk; + } + + // this might not be required at all, but it's kept here as an example + private static void initializeZookeeper(ZooKeeper zk) throws KeeperException, InterruptedException { + ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000, + "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + + private TestZKServer createTestZookeeper(int sessionTimeout) throws Exception { + TestZKServer testZKServer = new TestZKServer(); + try (ZooKeeper zkc = new ZooKeeper(testZKServer.getConnectionString(), sessionTimeout, event -> { + })) { + initializeZookeeper(zkc); + } + registerCloseable(testZKServer); + return testZKServer; + } + protected void handlePreallocatePorts(ServiceConfiguration config) { if (super.preallocatePorts) { config.getBrokerServicePort().ifPresent(portNumber -> { @@ -666,28 +719,30 @@ public class PulsarTestContext implements AutoCloseable { if (super.localMetadataStore == null || super.configurationMetadataStore == null) { if (super.mockZooKeeper != null) { MetadataStoreExtended mockZookeeperMetadataStore = - createMockZookeeperMetadataStore(super.mockZooKeeper, MetadataStoreConfig.METADATA_STORE); + createMockZookeeperMetadataStore(super.mockZooKeeper, super.config, + MetadataStoreConfig.METADATA_STORE); if (super.localMetadataStore == null) { localMetadataStore(mockZookeeperMetadataStore); } if (super.configurationMetadataStore == null) { if (super.mockZooKeeperGlobal != null) { configurationMetadataStore(createMockZookeeperMetadataStore(super.mockZooKeeperGlobal, - MetadataStoreConfig.CONFIGURATION_METADATA_STORE)); + super.config, MetadataStoreConfig.CONFIGURATION_METADATA_STORE)); } else { configurationMetadataStore(mockZookeeperMetadataStore); } } } else if (super.testZKServer != null) { MetadataStoreExtended testZookeeperMetadataStore = - createTestZookeeperMetadataStore(super.testZKServer, MetadataStoreConfig.METADATA_STORE); + createTestZookeeperMetadataStore(super.testZKServer, super.config, + MetadataStoreConfig.METADATA_STORE); if (super.localMetadataStore == null) { localMetadataStore(testZookeeperMetadataStore); } if (super.configurationMetadataStore == null) { if (super.testZKServerGlobal != null) { configurationMetadataStore(createTestZookeeperMetadataStore(super.testZKServerGlobal, - MetadataStoreConfig.CONFIGURATION_METADATA_STORE)); + super.config, MetadataStoreConfig.CONFIGURATION_METADATA_STORE)); } else { configurationMetadataStore(testZookeeperMetadataStore); } @@ -717,16 +772,30 @@ public class PulsarTestContext implements AutoCloseable { } } + private MetadataStoreConfig createMetadataStoreConfig(ServiceConfiguration config, String metadataStoreName) { + return MetadataStoreConfig.builder() + .sessionTimeoutMillis((int) config.getMetadataStoreSessionTimeoutMillis()) + .allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations()) + .batchingEnabled(config.isMetadataStoreBatchingEnabled()) + .batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis()) + .batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations()) + .batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb()) + .metadataStoreName(metadataStoreName) + .build(); + } + private MetadataStoreExtended createMockZookeeperMetadataStore(MockZooKeeper mockZooKeeper, + ServiceConfiguration config, String metadataStoreName) { // provide a unique session id for each instance MockZooKeeperSession mockZooKeeperSession = MockZooKeeperSession.newInstance(mockZooKeeper, false); + mockZooKeeperSession.setSessionTimeout((int) config.getMetadataStoreSessionTimeoutMillis()); registerCloseable(() -> { mockZooKeeperSession.close(); resetSpyOrMock(mockZooKeeperSession); }); - ZKMetadataStore zkMetadataStore = new ZKMetadataStore(mockZooKeeperSession, - MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build()); + ZKMetadataStore zkMetadataStore = + new ZKMetadataStore(mockZooKeeperSession, createMetadataStoreConfig(config, metadataStoreName)); registerCloseable(() -> { zkMetadataStore.close(); resetSpyOrMock(zkMetadataStore); @@ -738,9 +807,10 @@ public class PulsarTestContext implements AutoCloseable { @SneakyThrows private MetadataStoreExtended createTestZookeeperMetadataStore(TestZKServer zkServer, + ServiceConfiguration config, String metadataStoreName) { MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + zkServer.getConnectionString(), - MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build()); + createMetadataStoreConfig(config, metadataStoreName)); registerCloseable(store); MetadataStoreExtended nonClosingProxy = NonClosingProxyHandler.createNonClosingProxy(store, MetadataStoreExtended.class); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java index 994a97c2b10..e507ae45445 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java @@ -43,6 +43,7 @@ public class MockZooKeeperMetadataStoreProvider implements MetadataStoreProvider MockZooKeeper mockZooKeeper = mockZooKeepers.computeIfAbsent(metadataURL, k -> MockZooKeeper.newInstance().registerCloseable(() -> mockZooKeepers.remove(k))); MockZooKeeperSession mockZooKeeperSession = MockZooKeeperSession.newInstance(mockZooKeeper, true); + mockZooKeeperSession.setSessionTimeout(metadataStoreConfig.getSessionTimeoutMillis()); ZKMetadataStore zkMetadataStore = new ZKMetadataStore(mockZooKeeperSession, metadataStoreConfig, true); return zkMetadataStore; } diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index e124699ee13..2682f038df2 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -137,6 +137,7 @@ public class MockZooKeeper extends ZooKeeper { private ThreadLocal<Boolean> inExecutorThreadLocal; private int referenceCount; private List<AutoCloseable> closeables; + private int sessionTimeout; //see details of Objenesis caching - http://objenesis.org/details.html //see supported jvms - https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md @@ -188,6 +189,7 @@ public class MockZooKeeper extends ZooKeeper { zk.readOpDelayMs = readOpDelayMs; zk.sequentialIdGenerator = new AtomicLong(); zk.closeables = new ArrayList<>(); + zk.sessionTimeout = 30_000; return zk; } @@ -204,7 +206,11 @@ public class MockZooKeeper extends ZooKeeper { @Override public int getSessionTimeout() { - return 30_000; + return sessionTimeout; + } + + public void setSessionTimeout(int sessionTimeout) { + this.sessionTimeout = sessionTimeout; } private MockZooKeeper(String quorum) throws Exception { diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java index c812423b728..766f70979aa 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java @@ -48,6 +48,8 @@ public class MockZooKeeperSession extends ZooKeeper { private boolean closeMockZooKeeperOnClose; + private int sessionTimeout = -1; + public static MockZooKeeperSession newInstance(MockZooKeeper mockZooKeeper) { return newInstance(mockZooKeeper, true); } @@ -74,7 +76,15 @@ public class MockZooKeeperSession extends ZooKeeper { @Override public int getSessionTimeout() { - return mockZooKeeper.getSessionTimeout(); + if (sessionTimeout > 0) { + return sessionTimeout; + } else { + return mockZooKeeper.getSessionTimeout(); + } + } + + public void setSessionTimeout(int sessionTimeout) { + this.sessionTimeout = sessionTimeout; } @Override