This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17c6b139915 [fix][test] Make 
SameAuthParamsLookupAutoClusterFailoverTest less timing-sensitive (#25675)
17c6b139915 is described below

commit 17c6b139915b4cc299d29a071839a128479f68c9
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 14:22:56 2026 -0700

    [fix][test] Make SameAuthParamsLookupAutoClusterFailoverTest less 
timing-sensitive (#25675)
---
 ...ameAuthParamsLookupAutoClusterFailoverTest.java | 89 ++++++++++------------
 1 file changed, 40 insertions(+), 49 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
index c7bd197e486..a82ec4e803d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -64,11 +64,11 @@ public class SameAuthParamsLookupAutoClusterFailoverTest 
extends OneWayReplicato
     }
     @SuppressWarnings("deprecation")
 
-    // Each state-convergence phase below waits up to 2 minutes. With 3 phases 
plus
-    // cluster startup/teardown, the 240s overall timeout can be tight on slow 
CI agents
-    // (the probe timeout is 3s and recoverThreshold=5, so a single slow probe 
can
-    // stretch a phase to ~30s). Allow 8 minutes overall to match the 
per-phase budget.
-    @Test(dataProvider = "enabledTls", timeOut = 480 * 1000)
+    // Each state-convergence phase below waits up to 3 minutes. The probe 
timeout is 3s
+    // and recoverThreshold=5, so a transient probe failure during recovery 
resets the
+    // counter and a phase can need ~30s of healthy probes to recover. With 3 
phases plus
+    // cluster startup/teardown, allow 12 minutes overall to absorb slow CI 
agents.
+    @Test(dataProvider = "enabledTls", timeOut = 720 * 1000)
     public void testAutoClusterFailover(boolean enabledTls) throws Exception {
         // Start clusters.
         setup();
@@ -110,63 +110,30 @@ public class SameAuthParamsLookupAutoClusterFailoverTest 
extends OneWayReplicato
         producer.send("0");
         Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
 
-        CompletableFuture<Boolean> checkStatesFuture1 = new 
CompletableFuture<>();
-        executor.submit(() -> {
-            boolean res = stateArray[0] == PulsarServiceState.Healthy;
-            res = res & stateArray[1] == PulsarServiceState.Healthy;
-            res = res & stateArray[2] == PulsarServiceState.Healthy;
-            checkStatesFuture1.complete(res);
-        });
-        Assert.assertTrue(checkStatesFuture1.join());
+        assertStatesEqual(executor, stateArray,
+                PulsarServiceState.Healthy, PulsarServiceState.Healthy, 
PulsarServiceState.Healthy);
 
         // Test failover 0 --> 2.
         pulsar1.close();
-        Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
-            CompletableFuture<Boolean> checkStatesFuture2 = new 
CompletableFuture<>();
-            executor.submit(() -> {
-                boolean res = stateArray[0] == PulsarServiceState.Failed;
-                res = res & stateArray[1] == PulsarServiceState.Failed;
-                res = res & stateArray[2] == PulsarServiceState.Healthy;
-                checkStatesFuture2.complete(res);
-            });
-            Assert.assertTrue(checkStatesFuture2.join());
-            producer.send("0->2");
-            Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 2);
-        });
+        awaitStatesAndIndex(executor, stateArray, failover, 2,
+                PulsarServiceState.Failed, PulsarServiceState.Failed, 
PulsarServiceState.Healthy);
+        producer.send("0->2");
 
         // Test recover 2 --> 1.
         executor.execute(() -> {
             urlArray[1] = url2;
         });
-        Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
-            CompletableFuture<Boolean> checkStatesFuture3 = new 
CompletableFuture<>();
-            executor.submit(() -> {
-                boolean res = stateArray[0] == PulsarServiceState.Failed;
-                res = res & stateArray[1] == PulsarServiceState.Healthy;
-                res = res & stateArray[2] == PulsarServiceState.Healthy;
-                checkStatesFuture3.complete(res);
-            });
-            Assert.assertTrue(checkStatesFuture3.join());
-            producer.send("2->1");
-            Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 1);
-        });
+        awaitStatesAndIndex(executor, stateArray, failover, 1,
+                PulsarServiceState.Failed, PulsarServiceState.Healthy, 
PulsarServiceState.Healthy);
+        producer.send("2->1");
 
         // Test recover 1 --> 0.
         executor.execute(() -> {
             urlArray[0] = url2;
         });
-        Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
-            CompletableFuture<Boolean> checkStatesFuture4 = new 
CompletableFuture<>();
-            executor.submit(() -> {
-                boolean res = stateArray[0] == PulsarServiceState.Healthy;
-                res = res & stateArray[1] == PulsarServiceState.Healthy;
-                res = res & stateArray[2] == PulsarServiceState.Healthy;
-                checkStatesFuture4.complete(res);
-            });
-            Assert.assertTrue(checkStatesFuture4.join());
-            producer.send("1->0");
-            Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
-        });
+        awaitStatesAndIndex(executor, stateArray, failover, 0,
+                PulsarServiceState.Healthy, PulsarServiceState.Healthy, 
PulsarServiceState.Healthy);
+        producer.send("1->0");
 
         // cleanup.
         producer.close();
@@ -174,6 +141,30 @@ public class SameAuthParamsLookupAutoClusterFailoverTest 
extends OneWayReplicato
         dummyServer.close();
     }
 
+    /**
+     * Wait for the state machine to converge to the expected per-index states 
and current index.
+     * The state read happens on the failover executor to avoid races with the 
periodic check task,
+     * and producer/lookup operations are kept out of the polling loop so a 
slow message send does
+     * not consume the convergence budget.
+     */
+    private static void awaitStatesAndIndex(EventLoopGroup executor, 
PulsarServiceState[] stateArray,
+                                            
SameAuthParamsLookupAutoClusterFailover failover,
+                                            int expectedIndex,
+                                            PulsarServiceState... 
expectedStates) {
+        Awaitility.await().atMost(180, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertStatesEqual(executor, stateArray, expectedStates);
+            Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 
expectedIndex);
+        });
+    }
+
+    private static void assertStatesEqual(EventLoopGroup executor, 
PulsarServiceState[] stateArray,
+                                          PulsarServiceState... expected) 
throws Exception {
+        CompletableFuture<PulsarServiceState[]> snapshot = new 
CompletableFuture<>();
+        executor.submit(() -> snapshot.complete(stateArray.clone()));
+        PulsarServiceState[] actual = snapshot.get(10, TimeUnit.SECONDS);
+        Assert.assertEquals(actual, expected);
+    }
+
     @Override
     protected void cleanupPulsarResources() {
         // Nothing to do.

Reply via email to