This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 17c6b139915 [fix][test] Make
SameAuthParamsLookupAutoClusterFailoverTest less timing-sensitive (#25675)
17c6b139915 is described below
commit 17c6b139915b4cc299d29a071839a128479f68c9
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 14:22:56 2026 -0700
[fix][test] Make SameAuthParamsLookupAutoClusterFailoverTest less
timing-sensitive (#25675)
---
...ameAuthParamsLookupAutoClusterFailoverTest.java | 89 ++++++++++------------
1 file changed, 40 insertions(+), 49 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
index c7bd197e486..a82ec4e803d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -64,11 +64,11 @@ public class SameAuthParamsLookupAutoClusterFailoverTest
extends OneWayReplicato
}
@SuppressWarnings("deprecation")
- // Each state-convergence phase below waits up to 2 minutes. With 3 phases
plus
- // cluster startup/teardown, the 240s overall timeout can be tight on slow
CI agents
- // (the probe timeout is 3s and recoverThreshold=5, so a single slow probe
can
- // stretch a phase to ~30s). Allow 8 minutes overall to match the
per-phase budget.
- @Test(dataProvider = "enabledTls", timeOut = 480 * 1000)
+ // Each state-convergence phase below waits up to 3 minutes. The probe
timeout is 3s
+ // and recoverThreshold=5, so a transient probe failure during recovery
resets the
+ // counter and a phase can need ~30s of healthy probes to recover. With 3
phases plus
+ // cluster startup/teardown, allow 12 minutes overall to absorb slow CI
agents.
+ @Test(dataProvider = "enabledTls", timeOut = 720 * 1000)
public void testAutoClusterFailover(boolean enabledTls) throws Exception {
// Start clusters.
setup();
@@ -110,63 +110,30 @@ public class SameAuthParamsLookupAutoClusterFailoverTest
extends OneWayReplicato
producer.send("0");
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
- CompletableFuture<Boolean> checkStatesFuture1 = new
CompletableFuture<>();
- executor.submit(() -> {
- boolean res = stateArray[0] == PulsarServiceState.Healthy;
- res = res & stateArray[1] == PulsarServiceState.Healthy;
- res = res & stateArray[2] == PulsarServiceState.Healthy;
- checkStatesFuture1.complete(res);
- });
- Assert.assertTrue(checkStatesFuture1.join());
+ assertStatesEqual(executor, stateArray,
+ PulsarServiceState.Healthy, PulsarServiceState.Healthy,
PulsarServiceState.Healthy);
// Test failover 0 --> 2.
pulsar1.close();
- Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
- CompletableFuture<Boolean> checkStatesFuture2 = new
CompletableFuture<>();
- executor.submit(() -> {
- boolean res = stateArray[0] == PulsarServiceState.Failed;
- res = res & stateArray[1] == PulsarServiceState.Failed;
- res = res & stateArray[2] == PulsarServiceState.Healthy;
- checkStatesFuture2.complete(res);
- });
- Assert.assertTrue(checkStatesFuture2.join());
- producer.send("0->2");
- Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 2);
- });
+ awaitStatesAndIndex(executor, stateArray, failover, 2,
+ PulsarServiceState.Failed, PulsarServiceState.Failed,
PulsarServiceState.Healthy);
+ producer.send("0->2");
// Test recover 2 --> 1.
executor.execute(() -> {
urlArray[1] = url2;
});
- Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
- CompletableFuture<Boolean> checkStatesFuture3 = new
CompletableFuture<>();
- executor.submit(() -> {
- boolean res = stateArray[0] == PulsarServiceState.Failed;
- res = res & stateArray[1] == PulsarServiceState.Healthy;
- res = res & stateArray[2] == PulsarServiceState.Healthy;
- checkStatesFuture3.complete(res);
- });
- Assert.assertTrue(checkStatesFuture3.join());
- producer.send("2->1");
- Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 1);
- });
+ awaitStatesAndIndex(executor, stateArray, failover, 1,
+ PulsarServiceState.Failed, PulsarServiceState.Healthy,
PulsarServiceState.Healthy);
+ producer.send("2->1");
// Test recover 1 --> 0.
executor.execute(() -> {
urlArray[0] = url2;
});
- Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
- CompletableFuture<Boolean> checkStatesFuture4 = new
CompletableFuture<>();
- executor.submit(() -> {
- boolean res = stateArray[0] == PulsarServiceState.Healthy;
- res = res & stateArray[1] == PulsarServiceState.Healthy;
- res = res & stateArray[2] == PulsarServiceState.Healthy;
- checkStatesFuture4.complete(res);
- });
- Assert.assertTrue(checkStatesFuture4.join());
- producer.send("1->0");
- Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
- });
+ awaitStatesAndIndex(executor, stateArray, failover, 0,
+ PulsarServiceState.Healthy, PulsarServiceState.Healthy,
PulsarServiceState.Healthy);
+ producer.send("1->0");
// cleanup.
producer.close();
@@ -174,6 +141,30 @@ public class SameAuthParamsLookupAutoClusterFailoverTest
extends OneWayReplicato
dummyServer.close();
}
+ /**
+ * Wait for the state machine to converge to the expected per-index states
and current index.
+ * The state read happens on the failover executor to avoid races with the
periodic check task,
+ * and producer/lookup operations are kept out of the polling loop so a
slow message send does
+ * not consume the convergence budget.
+ */
+ private static void awaitStatesAndIndex(EventLoopGroup executor,
PulsarServiceState[] stateArray,
+
SameAuthParamsLookupAutoClusterFailover failover,
+ int expectedIndex,
+ PulsarServiceState...
expectedStates) {
+ Awaitility.await().atMost(180, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertStatesEqual(executor, stateArray, expectedStates);
+ Assert.assertEquals(failover.getCurrentPulsarServiceIndex(),
expectedIndex);
+ });
+ }
+
+ private static void assertStatesEqual(EventLoopGroup executor,
PulsarServiceState[] stateArray,
+ PulsarServiceState... expected)
throws Exception {
+ CompletableFuture<PulsarServiceState[]> snapshot = new
CompletableFuture<>();
+ executor.submit(() -> snapshot.complete(stateArray.clone()));
+ PulsarServiceState[] actual = snapshot.get(10, TimeUnit.SECONDS);
+ Assert.assertEquals(actual, expected);
+ }
+
@Override
protected void cleanupPulsarResources() {
// Nothing to do.