This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit de4f06b8f68efed1d11157f498115e7c47cf7c29
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Fri Apr 11 12:15:00 2025 +0300

    [improve][test] Use configured session timeout for MockZooKeeper and 
TestZKServer in PulsarTestContext (#24171)
    
    (cherry picked from commit 4c85b4754c7ed4bdc365cf1ec08c6804df5b985e)
---
 .../broker/testcontext/PulsarTestContext.java      | 160 +++++++++++++++------
 .../MockZooKeeperMetadataStoreProvider.java        |   1 +
 .../java/org/apache/zookeeper/MockZooKeeper.java   |   8 +-
 .../org/apache/zookeeper/MockZooKeeperSession.java |  12 +-
 4 files changed, 134 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index feb0be3d947..66d35d18392 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -218,6 +218,18 @@ public class PulsarTestContext implements AutoCloseable {
                 getPulsarService());
     }
 
+    private enum WithMockZooKeeperOrTestZKServer {
+        MOCKZOOKEEPER, MOCKZOOKEEPER_SEPARATE_GLOBAL, TEST_ZK_SERVER, 
TEST_ZK_SERVER_SEPARATE_GLOBAL;
+
+        boolean isMockZooKeeper() {
+            return this == MOCKZOOKEEPER || this == 
MOCKZOOKEEPER_SEPARATE_GLOBAL;
+        }
+
+        boolean isTestZKServer() {
+            return this == TEST_ZK_SERVER || this == 
TEST_ZK_SERVER_SEPARATE_GLOBAL;
+        }
+    }
+
     /**
      * A builder for a PulsarTestContext.
      *
@@ -233,6 +245,7 @@ public class PulsarTestContext implements AutoCloseable {
 
         protected boolean configOverrideCalled = false;
         protected Function<BrokerService, BrokerService> 
brokerServiceCustomizer = Function.identity();
+        protected WithMockZooKeeperOrTestZKServer 
withMockZooKeeperOrTestZKServer;
 
         /**
          * Initialize the ServiceConfiguration with default values.
@@ -379,11 +392,13 @@ public class PulsarTestContext implements AutoCloseable {
         public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext 
otherContext) {
             bookKeeperClient(otherContext.getBookKeeperClient());
             if (otherContext.getMockZooKeeper() != null) {
+                withMockZooKeeperOrTestZKServer = null;
                 mockZooKeeper(otherContext.getMockZooKeeper());
                 if (otherContext.getMockZooKeeperGlobal() != null) {
                     mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal());
                 }
             } else if (otherContext.getTestZKServer() != null) {
+                withMockZooKeeperOrTestZKServer = null;
                 testZKServer(otherContext.getTestZKServer());
                 if (otherContext.getTestZKServerGlobal() != null) {
                     testZKServerGlobal(otherContext.getTestZKServerGlobal());
@@ -435,31 +450,14 @@ public class PulsarTestContext implements AutoCloseable {
          * @return the builder
          */
         public Builder withMockZookeeper(boolean useSeparateGlobalZk) {
-            try {
-                mockZooKeeper(createMockZooKeeper());
-                if (useSeparateGlobalZk) {
-                    mockZooKeeperGlobal(createMockZooKeeper());
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+            if (useSeparateGlobalZk) {
+                withMockZooKeeperOrTestZKServer = 
WithMockZooKeeperOrTestZKServer.MOCKZOOKEEPER_SEPARATE_GLOBAL;
+            } else {
+                withMockZooKeeperOrTestZKServer = 
WithMockZooKeeperOrTestZKServer.MOCKZOOKEEPER;
             }
             return this;
         }
 
-        private MockZooKeeper createMockZooKeeper() throws Exception {
-            MockZooKeeper zk = MockZooKeeper.newInstance();
-            initializeZookeeper(zk);
-            registerCloseable(zk::shutdown);
-            return zk;
-        }
-
-        private static void initializeZookeeper(ZooKeeper zk) throws 
KeeperException, InterruptedException {
-            ZkUtils.createFullPathOptimistic(zk, 
"/ledgers/available/192.168.1.1:" + 5000,
-                    "".getBytes(StandardCharsets.UTF_8), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-            zk.create("/ledgers/LAYOUT", 
"1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        }
 
         /**
          * Configure this PulsarTestContext to use a test ZooKeeper instance 
which is
@@ -478,27 +476,14 @@ public class PulsarTestContext implements AutoCloseable {
          * @return the builder
          */
         public Builder withTestZookeeper(boolean useSeparateGlobalZk) {
-            try {
-                testZKServer(createTestZookeeper());
-                if (useSeparateGlobalZk) {
-                    testZKServerGlobal(createTestZookeeper());
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+            if (useSeparateGlobalZk) {
+                withMockZooKeeperOrTestZKServer = 
WithMockZooKeeperOrTestZKServer.TEST_ZK_SERVER_SEPARATE_GLOBAL;
+            } else {
+                withMockZooKeeperOrTestZKServer = 
WithMockZooKeeperOrTestZKServer.TEST_ZK_SERVER;
             }
             return this;
         }
 
-        private TestZKServer createTestZookeeper() throws Exception {
-            TestZKServer testZKServer = new TestZKServer();
-            try (ZooKeeper zkc = new 
ZooKeeper(testZKServer.getConnectionString(), 5000, event -> {
-            })) {
-                initializeZookeeper(zkc);
-            }
-            registerCloseable(testZKServer);
-            return testZKServer;
-        }
-
         /**
          * Applicable only when PulsarTestContext is not startable. This will 
configure mocks
          * for PulsarTestResources and related classes.
@@ -586,6 +571,7 @@ public class PulsarTestContext implements AutoCloseable {
             if (configOverrideCustomizer != null) {
                 configOverrideCustomizer.accept(super.config);
             }
+            createWithMockZooKeeperOrTestZKServerInstances();
             if (super.brokerInterceptor != null) {
                 super.config.setDisableBrokerInterceptors(false);
             }
@@ -605,6 +591,73 @@ public class PulsarTestContext implements AutoCloseable {
             return super.build();
         }
 
+        void createWithMockZooKeeperOrTestZKServerInstances() {
+            if (withMockZooKeeperOrTestZKServer == null) {
+                return;
+            }
+            int sessionTimeout = (int) 
super.config.getMetadataStoreSessionTimeoutMillis();
+            try {
+                if (withMockZooKeeperOrTestZKServer.isMockZooKeeper()) {
+                    if (super.mockZooKeeper == null) {
+                        mockZooKeeper(createMockZooKeeper(sessionTimeout));
+                    } else {
+                        log.warn("Skipping creating mockZooKeeper, already 
set");
+                    }
+                    if (withMockZooKeeperOrTestZKServer
+                            == 
WithMockZooKeeperOrTestZKServer.MOCKZOOKEEPER_SEPARATE_GLOBAL) {
+                        if (super.mockZooKeeperGlobal == null) {
+                            
mockZooKeeperGlobal(createMockZooKeeper(sessionTimeout));
+                        } else {
+                            log.warn("Skipping creating mockZooKeeperGlobal, 
already set");
+                        }
+                    }
+                } else if (withMockZooKeeperOrTestZKServer.isTestZKServer()) {
+                    if (super.testZKServer == null) {
+                        testZKServer(createTestZookeeper(sessionTimeout));
+                    } else {
+                        log.warn("Skipping creating testZKServer, already 
set");
+                    }
+                    if (withMockZooKeeperOrTestZKServer
+                            == 
WithMockZooKeeperOrTestZKServer.TEST_ZK_SERVER_SEPARATE_GLOBAL) {
+                        if (super.testZKServerGlobal == null) {
+                            
testZKServerGlobal(createTestZookeeper(sessionTimeout));
+                        } else {
+                            log.warn("Skipping creating testZKServerGlobal, 
already set");
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private MockZooKeeper createMockZooKeeper(int sessionTimeout) throws 
Exception {
+            MockZooKeeper zk = MockZooKeeper.newInstance();
+            zk.setSessionTimeout(sessionTimeout);
+            initializeZookeeper(zk);
+            registerCloseable(zk::shutdown);
+            return zk;
+        }
+
+        // this might not be required at all, but it's kept here as an example
+        private static void initializeZookeeper(ZooKeeper zk) throws 
KeeperException, InterruptedException {
+            ZkUtils.createFullPathOptimistic(zk, 
"/ledgers/available/192.168.1.1:" + 5000,
+                    "".getBytes(StandardCharsets.UTF_8), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            zk.create("/ledgers/LAYOUT", 
"1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+
+        private TestZKServer createTestZookeeper(int sessionTimeout) throws 
Exception {
+            TestZKServer testZKServer = new TestZKServer();
+            try (ZooKeeper zkc = new 
ZooKeeper(testZKServer.getConnectionString(), sessionTimeout, event -> {
+            })) {
+                initializeZookeeper(zkc);
+            }
+            registerCloseable(testZKServer);
+            return testZKServer;
+        }
+
         protected void handlePreallocatePorts(ServiceConfiguration config) {
             if (super.preallocatePorts) {
                 config.getBrokerServicePort().ifPresent(portNumber -> {
@@ -666,28 +719,30 @@ public class PulsarTestContext implements AutoCloseable {
             if (super.localMetadataStore == null || 
super.configurationMetadataStore == null) {
                 if (super.mockZooKeeper != null) {
                     MetadataStoreExtended mockZookeeperMetadataStore =
-                            
createMockZookeeperMetadataStore(super.mockZooKeeper, 
MetadataStoreConfig.METADATA_STORE);
+                            
createMockZookeeperMetadataStore(super.mockZooKeeper, super.config,
+                                    MetadataStoreConfig.METADATA_STORE);
                     if (super.localMetadataStore == null) {
                         localMetadataStore(mockZookeeperMetadataStore);
                     }
                     if (super.configurationMetadataStore == null) {
                         if (super.mockZooKeeperGlobal != null) {
                             
configurationMetadataStore(createMockZookeeperMetadataStore(super.mockZooKeeperGlobal,
-                                    
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
+                                    super.config, 
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
                         } else {
                             
configurationMetadataStore(mockZookeeperMetadataStore);
                         }
                     }
                 } else if (super.testZKServer != null) {
                     MetadataStoreExtended testZookeeperMetadataStore =
-                            
createTestZookeeperMetadataStore(super.testZKServer, 
MetadataStoreConfig.METADATA_STORE);
+                            
createTestZookeeperMetadataStore(super.testZKServer, super.config,
+                                    MetadataStoreConfig.METADATA_STORE);
                     if (super.localMetadataStore == null) {
                         localMetadataStore(testZookeeperMetadataStore);
                     }
                     if (super.configurationMetadataStore == null) {
                         if (super.testZKServerGlobal != null) {
                             
configurationMetadataStore(createTestZookeeperMetadataStore(super.testZKServerGlobal,
-                                    
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
+                                    super.config, 
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
                         } else {
                             
configurationMetadataStore(testZookeeperMetadataStore);
                         }
@@ -717,16 +772,30 @@ public class PulsarTestContext implements AutoCloseable {
             }
         }
 
+        private MetadataStoreConfig 
createMetadataStoreConfig(ServiceConfiguration config, String 
metadataStoreName) {
+            return MetadataStoreConfig.builder()
+                    .sessionTimeoutMillis((int) 
config.getMetadataStoreSessionTimeoutMillis())
+                    
.allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations())
+                    .batchingEnabled(config.isMetadataStoreBatchingEnabled())
+                    
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
+                    
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
+                    
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
+                    .metadataStoreName(metadataStoreName)
+                    .build();
+        }
+
         private MetadataStoreExtended 
createMockZookeeperMetadataStore(MockZooKeeper mockZooKeeper,
+                                                                       
ServiceConfiguration config,
                                                                        String 
metadataStoreName) {
             // provide a unique session id for each instance
             MockZooKeeperSession mockZooKeeperSession = 
MockZooKeeperSession.newInstance(mockZooKeeper, false);
+            mockZooKeeperSession.setSessionTimeout((int) 
config.getMetadataStoreSessionTimeoutMillis());
             registerCloseable(() -> {
                 mockZooKeeperSession.close();
                 resetSpyOrMock(mockZooKeeperSession);
             });
-            ZKMetadataStore zkMetadataStore = new 
ZKMetadataStore(mockZooKeeperSession,
-                    
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
+            ZKMetadataStore zkMetadataStore =
+                    new ZKMetadataStore(mockZooKeeperSession, 
createMetadataStoreConfig(config, metadataStoreName));
             registerCloseable(() -> {
                 zkMetadataStore.close();
                 resetSpyOrMock(zkMetadataStore);
@@ -738,9 +807,10 @@ public class PulsarTestContext implements AutoCloseable {
 
         @SneakyThrows
         private MetadataStoreExtended 
createTestZookeeperMetadataStore(TestZKServer zkServer,
+                                                                       
ServiceConfiguration config,
                                                                        String 
metadataStoreName) {
             MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + 
zkServer.getConnectionString(),
-                    
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
+                    createMetadataStoreConfig(config, metadataStoreName));
             registerCloseable(store);
             MetadataStoreExtended nonClosingProxy =
                     NonClosingProxyHandler.createNonClosingProxy(store, 
MetadataStoreExtended.class);
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java
index 994a97c2b10..e507ae45445 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MockZooKeeperMetadataStoreProvider.java
@@ -43,6 +43,7 @@ public class MockZooKeeperMetadataStoreProvider implements 
MetadataStoreProvider
         MockZooKeeper mockZooKeeper = 
mockZooKeepers.computeIfAbsent(metadataURL,
                 k -> MockZooKeeper.newInstance().registerCloseable(() -> 
mockZooKeepers.remove(k)));
         MockZooKeeperSession mockZooKeeperSession = 
MockZooKeeperSession.newInstance(mockZooKeeper, true);
+        
mockZooKeeperSession.setSessionTimeout(metadataStoreConfig.getSessionTimeoutMillis());
         ZKMetadataStore zkMetadataStore = new 
ZKMetadataStore(mockZooKeeperSession, metadataStoreConfig, true);
         return zkMetadataStore;
     }
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java 
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index e124699ee13..2682f038df2 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -137,6 +137,7 @@ public class MockZooKeeper extends ZooKeeper {
     private ThreadLocal<Boolean> inExecutorThreadLocal;
     private int referenceCount;
     private List<AutoCloseable> closeables;
+    private int sessionTimeout;
 
     //see details of Objenesis caching - http://objenesis.org/details.html
     //see supported jvms - 
https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md
@@ -188,6 +189,7 @@ public class MockZooKeeper extends ZooKeeper {
         zk.readOpDelayMs = readOpDelayMs;
         zk.sequentialIdGenerator = new AtomicLong();
         zk.closeables = new ArrayList<>();
+        zk.sessionTimeout = 30_000;
         return zk;
     }
 
@@ -204,7 +206,11 @@ public class MockZooKeeper extends ZooKeeper {
 
     @Override
     public int getSessionTimeout() {
-        return 30_000;
+        return sessionTimeout;
+    }
+
+    public void setSessionTimeout(int sessionTimeout) {
+        this.sessionTimeout = sessionTimeout;
     }
 
     private MockZooKeeper(String quorum) throws Exception {
diff --git 
a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java 
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
index c812423b728..766f70979aa 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
@@ -48,6 +48,8 @@ public class MockZooKeeperSession extends ZooKeeper {
 
     private boolean closeMockZooKeeperOnClose;
 
+    private int sessionTimeout = -1;
+
     public static MockZooKeeperSession newInstance(MockZooKeeper 
mockZooKeeper) {
         return newInstance(mockZooKeeper, true);
     }
@@ -74,7 +76,15 @@ public class MockZooKeeperSession extends ZooKeeper {
 
     @Override
     public int getSessionTimeout() {
-        return mockZooKeeper.getSessionTimeout();
+        if (sessionTimeout > 0) {
+            return sessionTimeout;
+        } else {
+            return mockZooKeeper.getSessionTimeout();
+        }
+    }
+
+    public void setSessionTimeout(int sessionTimeout) {
+        this.sessionTimeout = sessionTimeout;
     }
 
     @Override

Reply via email to