This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 57f955792d362212ae8755816b49398df66bdfa1 Author: Zixuan Liu <[email protected]> AuthorDate: Wed Apr 9 12:10:38 2025 +0800 [fix][test] Fix flaky BrokerServiceChaosTest.testFetchPartitionedTopicMetadataWithCacheRefresh (#24161) Signed-off-by: Zixuan Liu <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 9bcad7cc010348df252b065c59fae42915aca80b) --- .../broker/service/CanReconnectZKClientPulsarServiceBaseTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index 683fa702b4e..5a8f946fe2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -34,12 +34,14 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.apache.zookeeper.ClientCnxn; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @Slf4j @@ -58,6 +60,7 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr protected PulsarAdmin admin; protected PulsarClient client; protected ZooKeeper localZkOfBroker; + protected volatile SessionEvent sessionEvent; protected Object localMetaDataStoreClientCnx; protected final AtomicBoolean connectionTerminationThreadKeepRunning = new AtomicBoolean(); private volatile Thread connectionTerminationThread; @@ -80,6 +83,10 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr broker = pulsar.getBrokerService(); ZKMetadataStore zkMetadataStore = (ZKMetadataStore) pulsar.getLocalMetadataStore(); localZkOfBroker = zkMetadataStore.getZkClient(); + zkMetadataStore.registerSessionListener(n -> { + log.info("Received session event: {}", n); + sessionEvent = n; + }); ClientCnxn cnxn = WhiteboxImpl.getInternalState(localZkOfBroker, "cnxn"); Object sendThread = WhiteboxImpl.getInternalState(cnxn, "sendThread"); localMetaDataStoreClientCnx = WhiteboxImpl.getInternalState(sendThread, "clientCnxnSocket"); @@ -150,6 +157,7 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr connectionTerminationThread.join(); connectionTerminationThread = null; } + Awaitility.await().until(() -> SessionEvent.Reconnected.equals(sessionEvent)); } protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
