This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b8a7be48223 [fix][test] Fix flaky BrokerServiceChaosTest (#24162)
b8a7be48223 is described below

commit b8a7be48223740a80f40d79d78492fe27801e43c
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Apr 8 20:38:35 2025 +0300

    [fix][test] Fix flaky BrokerServiceChaosTest (#24162)
---
 .../broker/service/BkEnsemblesChaosTest.java       |  4 +-
 .../broker/service/BrokerServiceChaosTest.java     | 14 +++----
 .../CanReconnectZKClientPulsarServiceBaseTest.java | 48 ++++++++++++++--------
 3 files changed, 39 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
index d49489d8a84..a1f0f8fa256 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
@@ -55,13 +55,13 @@ public class BkEnsemblesChaosTest extends 
CanReconnectZKClientPulsarServiceBaseT
         for (int i = 0; i < numberOfBookies - 1; i++){
             bkEnsemble.stopBK(i);
         }
-        makeLocalMetadataStoreKeepReconnect();
+        startLocalMetadataStoreConnectionTermination();
         for (int i = 0; i < numberOfBookies - 1; i++){
             bkEnsemble.startBK(i);
         }
         // Sleep 100ms to lose the notifications of ZK node create.
         Thread.sleep(100);
-        stopLocalMetadataStoreAlwaysReconnect();
+        stopLocalMetadataStoreConnectionTermination();
 
         // Ensure broker still works.
         admin.topics().unload(topicName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
index 73db8513794..6313d72329d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
@@ -20,8 +20,9 @@ package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertEquals;
 import java.nio.charset.StandardCharsets;
-import java.util.UUID;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -57,9 +58,10 @@ public class BrokerServiceChaosTest extends 
CanReconnectZKClientPulsarServiceBas
     public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws 
Exception {
         final String configMetadataStoreConnectString =
                 
WhiteboxImpl.getInternalState(pulsar.getConfigurationMetadataStore(), 
"zkConnectString");
+        @Cleanup
         final ZooKeeper anotherZKCli = new 
ZooKeeper(configMetadataStoreConnectString, 5000, null);
         // Set policy of auto create topic to PARTITIONED.
-        final String ns = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        final String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns");
         final TopicName topicName1 = TopicName.get("persistent://" + ns + 
"/tp1");
         final TopicName topicName2 = TopicName.get("persistent://" + ns + 
"/tp2");
         admin.namespaces().createNamespace(ns);
@@ -81,11 +83,11 @@ public class BrokerServiceChaosTest extends 
CanReconnectZKClientPulsarServiceBas
 
         // Create the partitioned metadata by another zk client.
         // Make a error to make the cache could not update.
-        makeLocalMetadataStoreKeepReconnect();
+        startLocalMetadataStoreConnectionTermination();
         anotherZKCli.create("/admin/partitioned-topics/" + ns + "/persistent/" 
+ topicName2.getLocalName(),
                 "{\"partitions\":3}".getBytes(StandardCharsets.UTF_8),
                 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        stopLocalMetadataStoreAlwaysReconnect();
+        stopLocalMetadataStoreConnectionTermination();
 
         // Get the partitioned metadata from cache, there is 90% chance that 
partitions count of metadata is 0.
         PartitionedTopicMetadata partitionedTopicMetadata2 =
@@ -97,9 +99,5 @@ public class BrokerServiceChaosTest extends 
CanReconnectZKClientPulsarServiceBas
         PartitionedTopicMetadata partitionedTopicMetadata3 =
                 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, 
true).get();
         assertEquals(partitionedTopicMetadata3.partitions, 3);
-
-        // cleanup.
-        admin.topics().deletePartitionedTopic(topicName2.toString());
-        anotherZKCli.close();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
index 787b4d3154e..dc41764ca21 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.nio.channels.SelectionKey;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
@@ -65,7 +66,8 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
     protected PulsarClient client;
     protected ZooKeeper localZkOfBroker;
     protected Object localMetaDataStoreClientCnx;
-    protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = 
new AtomicBoolean();
+    protected final AtomicBoolean connectionTerminationThreadKeepRunning = new 
AtomicBoolean();
+    private volatile Thread connectionTerminationThread;
 
     protected void startZKAndBK() throws Exception {
         // Start ZK.
@@ -95,25 +97,29 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
         client = PulsarClient.builder().serviceUrl(url.toString()).build();
     }
 
-    protected void makeLocalMetadataStoreKeepReconnect() throws Exception {
-        if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false, 
true)) {
-            throw new RuntimeException("Local metadata store is already 
keeping reconnect");
+    protected void startLocalMetadataStoreConnectionTermination() throws 
Exception {
+        if (!connectionTerminationThreadKeepRunning.compareAndSet(false, 
true)) {
+            throw new RuntimeException("Local metadata store connection is 
already being terminated");
         }
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if 
(localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO"))
 {
-            makeLocalMetadataStoreKeepReconnectNIO();
+            startNIOImplTermination(future);
         } else {
             // ClientCnxnSocketNetty.
-            makeLocalMetadataStoreKeepReconnectNetty();
+            startNettyImplTermination(future);
         }
+        // wait until connection is closed at least once
+        future.get();
     }
 
-    protected void makeLocalMetadataStoreKeepReconnectNIO() {
-        new Thread(() -> {
-            while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+    private void startNIOImplTermination(CompletableFuture<Void> future) {
+        connectionTerminationThread = new Thread(() -> {
+            while (connectionTerminationThreadKeepRunning.get()) {
                 try {
                     SelectionKey sockKey = 
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey");
                     if (sockKey != null) {
                         sockKey.channel().close();
+                        future.complete(null);
                     }
                     // Prevents high cpu usage.
                     Thread.sleep(5);
@@ -121,16 +127,18 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
                     log.error("Try close the ZK connection of local metadata 
store failed: {}", e.toString());
                 }
             }
-        }).start();
+        });
+        connectionTerminationThread.start();
     }
 
-    protected void makeLocalMetadataStoreKeepReconnectNetty() {
-        new Thread(() -> {
-            while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+    private void startNettyImplTermination(CompletableFuture<Void> future) {
+        connectionTerminationThread = new Thread(() -> {
+            while (connectionTerminationThreadKeepRunning.get()) {
                 try {
                     Channel channel = 
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "channel");
                     if (channel != null) {
                         channel.close();
+                        future.complete(null);
                     }
                     // Prevents high cpu usage.
                     Thread.sleep(5);
@@ -138,11 +146,17 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
                     log.error("Try close the ZK connection of local metadata 
store failed: {}", e.toString());
                 }
             }
-        }).start();
+        });
+        connectionTerminationThread.start();
     }
 
-    protected void stopLocalMetadataStoreAlwaysReconnect() {
-        LocalMetadataStoreInReconnectFinishSignal.set(false);
+    protected void stopLocalMetadataStoreConnectionTermination() throws 
InterruptedException {
+        connectionTerminationThreadKeepRunning.set(false);
+        if (connectionTerminationThread != null) {
+            // Wait for the reconnect thread to finish.
+            connectionTerminationThread.join();
+            connectionTerminationThread = null;
+        }
     }
 
     protected void createDefaultTenantsAndClustersAndNamespace() throws 
Exception {
@@ -205,7 +219,7 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
         markCurrentSetupNumberCleaned();
         log.info("--- Shutting down ---");
 
-        stopLocalMetadataStoreAlwaysReconnect();
+        stopLocalMetadataStoreConnectionTermination();
 
         // Stop brokers.
         if (client != null) {

Reply via email to