This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new b8a7be48223 [fix][test] Fix flaky BrokerServiceChaosTest (#24162)
b8a7be48223 is described below
commit b8a7be48223740a80f40d79d78492fe27801e43c
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Apr 8 20:38:35 2025 +0300
[fix][test] Fix flaky BrokerServiceChaosTest (#24162)
---
.../broker/service/BkEnsemblesChaosTest.java | 4 +-
.../broker/service/BrokerServiceChaosTest.java | 14 +++----
.../CanReconnectZKClientPulsarServiceBaseTest.java | 48 ++++++++++++++--------
3 files changed, 39 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
index d49489d8a84..a1f0f8fa256 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
@@ -55,13 +55,13 @@ public class BkEnsemblesChaosTest extends
CanReconnectZKClientPulsarServiceBaseT
for (int i = 0; i < numberOfBookies - 1; i++){
bkEnsemble.stopBK(i);
}
- makeLocalMetadataStoreKeepReconnect();
+ startLocalMetadataStoreConnectionTermination();
for (int i = 0; i < numberOfBookies - 1; i++){
bkEnsemble.startBK(i);
}
// Sleep 100ms to lose the notifications of ZK node create.
Thread.sleep(100);
- stopLocalMetadataStoreAlwaysReconnect();
+ stopLocalMetadataStoreConnectionTermination();
// Ensure broker still works.
admin.topics().unload(topicName);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
index 73db8513794..6313d72329d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
@@ -20,8 +20,9 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import java.nio.charset.StandardCharsets;
-import java.util.UUID;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -57,9 +58,10 @@ public class BrokerServiceChaosTest extends
CanReconnectZKClientPulsarServiceBas
public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws
Exception {
final String configMetadataStoreConnectString =
WhiteboxImpl.getInternalState(pulsar.getConfigurationMetadataStore(),
"zkConnectString");
+ @Cleanup
final ZooKeeper anotherZKCli = new
ZooKeeper(configMetadataStoreConnectString, 5000, null);
// Set policy of auto create topic to PARTITIONED.
- final String ns = defaultTenant + "/ns_" +
UUID.randomUUID().toString().replaceAll("-", "");
+ final String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns");
final TopicName topicName1 = TopicName.get("persistent://" + ns +
"/tp1");
final TopicName topicName2 = TopicName.get("persistent://" + ns +
"/tp2");
admin.namespaces().createNamespace(ns);
@@ -81,11 +83,11 @@ public class BrokerServiceChaosTest extends
CanReconnectZKClientPulsarServiceBas
// Create the partitioned metadata by another zk client.
// Make a error to make the cache could not update.
- makeLocalMetadataStoreKeepReconnect();
+ startLocalMetadataStoreConnectionTermination();
anotherZKCli.create("/admin/partitioned-topics/" + ns + "/persistent/"
+ topicName2.getLocalName(),
"{\"partitions\":3}".getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- stopLocalMetadataStoreAlwaysReconnect();
+ stopLocalMetadataStoreConnectionTermination();
// Get the partitioned metadata from cache, there is 90% chance that
partitions count of metadata is 0.
PartitionedTopicMetadata partitionedTopicMetadata2 =
@@ -97,9 +99,5 @@ public class BrokerServiceChaosTest extends
CanReconnectZKClientPulsarServiceBas
PartitionedTopicMetadata partitionedTopicMetadata3 =
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2,
true).get();
assertEquals(partitionedTopicMetadata3.partitions, 3);
-
- // cleanup.
- admin.topics().deletePartitionedTopic(topicName2.toString());
- anotherZKCli.close();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
index 787b4d3154e..dc41764ca21 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -25,6 +25,7 @@ import java.net.URL;
import java.nio.channels.SelectionKey;
import java.util.Collections;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
@@ -65,7 +66,8 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
protected PulsarClient client;
protected ZooKeeper localZkOfBroker;
protected Object localMetaDataStoreClientCnx;
- protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal =
new AtomicBoolean();
+ protected final AtomicBoolean connectionTerminationThreadKeepRunning = new
AtomicBoolean();
+ private volatile Thread connectionTerminationThread;
protected void startZKAndBK() throws Exception {
// Start ZK.
@@ -95,25 +97,29 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
client = PulsarClient.builder().serviceUrl(url.toString()).build();
}
- protected void makeLocalMetadataStoreKeepReconnect() throws Exception {
- if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false,
true)) {
- throw new RuntimeException("Local metadata store is already
keeping reconnect");
+ protected void startLocalMetadataStoreConnectionTermination() throws
Exception {
+ if (!connectionTerminationThreadKeepRunning.compareAndSet(false,
true)) {
+ throw new RuntimeException("Local metadata store connection is
already being terminated");
}
+ CompletableFuture<Void> future = new CompletableFuture<>();
if
(localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO"))
{
- makeLocalMetadataStoreKeepReconnectNIO();
+ startNIOImplTermination(future);
} else {
// ClientCnxnSocketNetty.
- makeLocalMetadataStoreKeepReconnectNetty();
+ startNettyImplTermination(future);
}
+ // wait until connection is closed at least once
+ future.get();
}
- protected void makeLocalMetadataStoreKeepReconnectNIO() {
- new Thread(() -> {
- while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+ private void startNIOImplTermination(CompletableFuture<Void> future) {
+ connectionTerminationThread = new Thread(() -> {
+ while (connectionTerminationThreadKeepRunning.get()) {
try {
SelectionKey sockKey =
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey");
if (sockKey != null) {
sockKey.channel().close();
+ future.complete(null);
}
// Prevents high cpu usage.
Thread.sleep(5);
@@ -121,16 +127,18 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
log.error("Try close the ZK connection of local metadata
store failed: {}", e.toString());
}
}
- }).start();
+ });
+ connectionTerminationThread.start();
}
- protected void makeLocalMetadataStoreKeepReconnectNetty() {
- new Thread(() -> {
- while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+ private void startNettyImplTermination(CompletableFuture<Void> future) {
+ connectionTerminationThread = new Thread(() -> {
+ while (connectionTerminationThreadKeepRunning.get()) {
try {
Channel channel =
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "channel");
if (channel != null) {
channel.close();
+ future.complete(null);
}
// Prevents high cpu usage.
Thread.sleep(5);
@@ -138,11 +146,17 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
log.error("Try close the ZK connection of local metadata
store failed: {}", e.toString());
}
}
- }).start();
+ });
+ connectionTerminationThread.start();
}
- protected void stopLocalMetadataStoreAlwaysReconnect() {
- LocalMetadataStoreInReconnectFinishSignal.set(false);
+ protected void stopLocalMetadataStoreConnectionTermination() throws
InterruptedException {
+ connectionTerminationThreadKeepRunning.set(false);
+ if (connectionTerminationThread != null) {
+ // Wait for the reconnect thread to finish.
+ connectionTerminationThread.join();
+ connectionTerminationThread = null;
+ }
}
protected void createDefaultTenantsAndClustersAndNamespace() throws
Exception {
@@ -205,7 +219,7 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
markCurrentSetupNumberCleaned();
log.info("--- Shutting down ---");
- stopLocalMetadataStoreAlwaysReconnect();
+ stopLocalMetadataStoreConnectionTermination();
// Stop brokers.
if (client != null) {