This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bcad7cc010 [fix][test] Fix flaky 
BrokerServiceChaosTest.testFetchPartitionedTopicMetadataWithCacheRefresh 
(#24161)
9bcad7cc010 is described below

commit 9bcad7cc010348df252b065c59fae42915aca80b
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Apr 9 12:10:38 2025 +0800

    [fix][test] Fix flaky 
BrokerServiceChaosTest.testFetchPartitionedTopicMetadataWithCacheRefresh 
(#24161)
    
    Signed-off-by: Zixuan Liu <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../broker/service/CanReconnectZKClientPulsarServiceBaseTest.java | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
index dc41764ca21..b2db0e84749 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -35,12 +35,14 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 
 @Slf4j
@@ -65,6 +67,7 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
     protected PulsarAdmin admin;
     protected PulsarClient client;
     protected ZooKeeper localZkOfBroker;
+    protected volatile SessionEvent sessionEvent;
     protected Object localMetaDataStoreClientCnx;
     protected final AtomicBoolean connectionTerminationThreadKeepRunning = new 
AtomicBoolean();
     private volatile Thread connectionTerminationThread;
@@ -87,6 +90,10 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
         broker = pulsar.getBrokerService();
         ZKMetadataStore zkMetadataStore = (ZKMetadataStore) 
pulsar.getLocalMetadataStore();
         localZkOfBroker = zkMetadataStore.getZkClient();
+        zkMetadataStore.registerSessionListener(n -> {
+            log.info("Received session event: {}", n);
+            sessionEvent = n;
+        });
         ClientCnxn cnxn = WhiteboxImpl.getInternalState(localZkOfBroker, 
"cnxn");
         Object sendThread = WhiteboxImpl.getInternalState(cnxn, "sendThread");
         localMetaDataStoreClientCnx = 
WhiteboxImpl.getInternalState(sendThread, "clientCnxnSocket");
@@ -157,6 +164,7 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
             connectionTerminationThread.join();
             connectionTerminationThread = null;
         }
+        Awaitility.await().until(() -> 
SessionEvent.Reconnected.equals(sessionEvent));
     }
 
     protected void createDefaultTenantsAndClustersAndNamespace() throws 
Exception {

Reply via email to