This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3d455768a0be16dccbd2276f9ac596ff16f756fa Author: Lari Hotari <[email protected]> AuthorDate: Tue Apr 8 20:38:35 2025 +0300 [fix][test] Fix flaky BrokerServiceChaosTest (#24162) (cherry picked from commit d59664ca9ca7300a8a7bc3faa1869f623eaac8ae) --- .../broker/service/BkEnsemblesChaosTest.java | 4 +- .../broker/service/BrokerServiceChaosTest.java | 14 +++---- .../CanReconnectZKClientPulsarServiceBaseTest.java | 48 ++++++++++++++-------- 3 files changed, 39 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java index d49489d8a84..a1f0f8fa256 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java @@ -55,13 +55,13 @@ public class BkEnsemblesChaosTest extends CanReconnectZKClientPulsarServiceBaseT for (int i = 0; i < numberOfBookies - 1; i++){ bkEnsemble.stopBK(i); } - makeLocalMetadataStoreKeepReconnect(); + startLocalMetadataStoreConnectionTermination(); for (int i = 0; i < numberOfBookies - 1; i++){ bkEnsemble.startBK(i); } // Sleep 100ms to lose the notifications of ZK node create. Thread.sleep(100); - stopLocalMetadataStoreAlwaysReconnect(); + stopLocalMetadataStoreConnectionTermination(); // Ensure broker still works. admin.topics().unload(topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java index 73db8513794..6313d72329d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java @@ -20,8 +20,9 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertEquals; import java.nio.charset.StandardCharsets; -import java.util.UUID; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; @@ -57,9 +58,10 @@ public class BrokerServiceChaosTest extends CanReconnectZKClientPulsarServiceBas public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws Exception { final String configMetadataStoreConnectString = WhiteboxImpl.getInternalState(pulsar.getConfigurationMetadataStore(), "zkConnectString"); + @Cleanup final ZooKeeper anotherZKCli = new ZooKeeper(configMetadataStoreConnectString, 5000, null); // Set policy of auto create topic to PARTITIONED. - final String ns = defaultTenant + "/ns_" + UUID.randomUUID().toString().replaceAll("-", ""); + final String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns"); final TopicName topicName1 = TopicName.get("persistent://" + ns + "/tp1"); final TopicName topicName2 = TopicName.get("persistent://" + ns + "/tp2"); admin.namespaces().createNamespace(ns); @@ -81,11 +83,11 @@ public class BrokerServiceChaosTest extends CanReconnectZKClientPulsarServiceBas // Create the partitioned metadata by another zk client. // Make a error to make the cache could not update. - makeLocalMetadataStoreKeepReconnect(); + startLocalMetadataStoreConnectionTermination(); anotherZKCli.create("/admin/partitioned-topics/" + ns + "/persistent/" + topicName2.getLocalName(), "{\"partitions\":3}".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - stopLocalMetadataStoreAlwaysReconnect(); + stopLocalMetadataStoreConnectionTermination(); // Get the partitioned metadata from cache, there is 90% chance that partitions count of metadata is 0. PartitionedTopicMetadata partitionedTopicMetadata2 = @@ -97,9 +99,5 @@ public class BrokerServiceChaosTest extends CanReconnectZKClientPulsarServiceBas PartitionedTopicMetadata partitionedTopicMetadata3 = pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, true).get(); assertEquals(partitionedTopicMetadata3.partitions, 3); - - // cleanup. - admin.topics().deletePartitionedTopic(topicName2.toString()); - anotherZKCli.close(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index a1cb4abc4c3..683fa702b4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -24,6 +24,7 @@ import java.net.URL; import java.nio.channels.SelectionKey; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; @@ -58,7 +59,8 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr protected PulsarClient client; protected ZooKeeper localZkOfBroker; protected Object localMetaDataStoreClientCnx; - protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = new AtomicBoolean(); + protected final AtomicBoolean connectionTerminationThreadKeepRunning = new AtomicBoolean(); + private volatile Thread connectionTerminationThread; protected void startZKAndBK() throws Exception { // Start ZK. @@ -88,25 +90,29 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr client = PulsarClient.builder().serviceUrl(url.toString()).build(); } - protected void makeLocalMetadataStoreKeepReconnect() throws Exception { - if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false, true)) { - throw new RuntimeException("Local metadata store is already keeping reconnect"); + protected void startLocalMetadataStoreConnectionTermination() throws Exception { + if (!connectionTerminationThreadKeepRunning.compareAndSet(false, true)) { + throw new RuntimeException("Local metadata store connection is already being terminated"); } + CompletableFuture<Void> future = new CompletableFuture<>(); if (localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO")) { - makeLocalMetadataStoreKeepReconnectNIO(); + startNIOImplTermination(future); } else { // ClientCnxnSocketNetty. - makeLocalMetadataStoreKeepReconnectNetty(); + startNettyImplTermination(future); } + // wait until connection is closed at least once + future.get(); } - protected void makeLocalMetadataStoreKeepReconnectNIO() { - new Thread(() -> { - while (LocalMetadataStoreInReconnectFinishSignal.get()) { + private void startNIOImplTermination(CompletableFuture<Void> future) { + connectionTerminationThread = new Thread(() -> { + while (connectionTerminationThreadKeepRunning.get()) { try { SelectionKey sockKey = WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey"); if (sockKey != null) { sockKey.channel().close(); + future.complete(null); } // Prevents high cpu usage. Thread.sleep(5); @@ -114,16 +120,18 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr log.error("Try close the ZK connection of local metadata store failed: {}", e.toString()); } } - }).start(); + }); + connectionTerminationThread.start(); } - protected void makeLocalMetadataStoreKeepReconnectNetty() { - new Thread(() -> { - while (LocalMetadataStoreInReconnectFinishSignal.get()) { + private void startNettyImplTermination(CompletableFuture<Void> future) { + connectionTerminationThread = new Thread(() -> { + while (connectionTerminationThreadKeepRunning.get()) { try { Channel channel = WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "channel"); if (channel != null) { channel.close(); + future.complete(null); } // Prevents high cpu usage. Thread.sleep(5); @@ -131,11 +139,17 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr log.error("Try close the ZK connection of local metadata store failed: {}", e.toString()); } } - }).start(); + }); + connectionTerminationThread.start(); } - protected void stopLocalMetadataStoreAlwaysReconnect() { - LocalMetadataStoreInReconnectFinishSignal.set(false); + protected void stopLocalMetadataStoreConnectionTermination() throws InterruptedException { + connectionTerminationThreadKeepRunning.set(false); + if (connectionTerminationThread != null) { + // Wait for the reconnect thread to finish. + connectionTerminationThread.join(); + connectionTerminationThread = null; + } } protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { @@ -195,7 +209,7 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr markCurrentSetupNumberCleaned(); log.info("--- Shutting down ---"); - stopLocalMetadataStoreAlwaysReconnect(); + stopLocalMetadataStoreConnectionTermination(); // Stop brokers. if (client != null) {
