This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2b9ef9239ddb83b5baa175ed2ba0b4687b1acaac Author: Lari Hotari <[email protected]> AuthorDate: Mon Mar 23 17:11:17 2026 +0200 [fix][client] Fix stale Healthy state in SameAuthParamsLookupAutoClusterFailover causing flaky test (#25388) Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]> (cherry picked from commit 3bc834f2fa8aae1ffe8f2fb3f795acd3d0de755d) --- .../SameAuthParamsLookupAutoClusterFailover.java | 5 + ...ameAuthParamsLookupAutoClusterFailoverTest.java | 254 +++++++++++++++++++++ 2 files changed, 259 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java index 743f2e15164..5ae7b96906e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java @@ -143,6 +143,11 @@ public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvid for (int i = currentPulsarServiceIndex + 1; i < pulsarServiceUrlArray.length; i++) { if (probeAvailable(i)) { return i; + } else { + // Mark the service as Failed to prevent a spurious recovery to it + // after we failover to a higher-indexed service. + pulsarServiceStateArray[i] = PulsarServiceState.Failed; + checkCounterArray[i].setValue(0); } } return -1; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailoverTest.java new file mode 100644 index 00000000000..31e2bf6aaed --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailoverTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import io.netty.channel.EventLoopGroup; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class SameAuthParamsLookupAutoClusterFailoverTest { + + private static final String URL0 = "pulsar://broker0:6650"; + private static final String URL1 = "pulsar://broker1:6650"; + private static final String URL2 = "pulsar://broker2:6650"; + + private EventLoopGroup executor; + private PulsarClientImpl mockClient; + private SameAuthParamsLookupAutoClusterFailover failover; + private PulsarServiceState[] stateArray; + private MutableInt[] counterArray; + + @BeforeMethod + public void setup() throws Exception { + executor = EventLoopUtil.newEventLoopGroup(1, false, + new ExecutorProvider.ExtendedThreadFactory("test-failover")); + + String[] urlArray = new String[]{URL0, URL1, URL2}; + failover = SameAuthParamsLookupAutoClusterFailover.builder() + .pulsarServiceUrlArray(urlArray) + .failoverThreshold(1) + .recoverThreshold(2) + .checkHealthyIntervalMs(100) + .testTopic("a/b/c") + .markTopicNotFoundAsAvailable(true) + .build(); + + mockClient = mock(PulsarClientImpl.class); + doNothing().when(mockClient).updateServiceUrl(anyString()); + doNothing().when(mockClient).reloadLookUp(); + + FieldUtils.writeField(failover, "pulsarClient", mockClient, true); + FieldUtils.writeField(failover, "executor", executor, true); + + stateArray = (PulsarServiceState[]) FieldUtils.readField(failover, "pulsarServiceStateArray", true); + counterArray = (MutableInt[]) FieldUtils.readField(failover, "checkCounterArray", true); + } + + @AfterMethod(alwaysRun = true) + public void cleanup() { + if (executor != null) { + executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); + } + } + + private void setLookupResult(String url, boolean available) { + LookupService lookup = mock(LookupService.class); + if (available) { + InetSocketAddress addr = InetSocketAddress.createUnresolved("broker", 6650); + when(lookup.getBroker(any())) + .thenReturn(CompletableFuture.completedFuture( + new LookupTopicResult(addr, addr, false))); + } else { + when(lookup.getBroker(any())) + .thenReturn(FutureUtil.failedFuture( + new RuntimeException("connection refused"))); + } + when(mockClient.getLookup(url)).thenReturn(lookup); + } + + /** + * Reproduces the race condition where findFailoverTo() skips over an unavailable service + * without marking it as Failed. This leaves a stale Healthy state that causes a spurious + * recovery bounce (0 -> 2 -> 1 -> 2) instead of a clean failover (0 -> 2). + * + * <p>Before the fix, after failover from index 0 to 2, state[1] remained Healthy (stale). + * On the next check cycle, firstHealthyPulsarService() would see state[1]=Healthy and + * immediately "recover" to index 1 — which is actually a broken service. This caused + * unnecessary bouncing and, combined with 3-second probe timeouts on dead services, + * could push the total failover time past the test's awaitility timeout. + */ + @Test(timeOut = 30000) + public void testFindFailoverToMarksSkippedServicesAsFailed() throws Exception { + // url0 is down, url1 is down, url2 is healthy. + setLookupResult(URL0, false); + setLookupResult(URL1, false); + setLookupResult(URL2, true); + + // Pre-set state[0] to Failed (as if checkPulsarServices already detected it), + // then run one check cycle. All on the executor to ensure thread safety. + runOnExecutor(() -> { + stateArray[0] = PulsarServiceState.Failed; + counterArray[0].setValue(0); + }); + runCheckCycle(); + + // After the fix, findFailoverTo marks url1 as Failed when it fails probing. + // Verify on the executor thread where state is owned. + runOnExecutor(() -> { + assertEquals(failover.getCurrentPulsarServiceIndex(), 2, + "Should have failed over to index 2"); + assertEquals(stateArray[1], PulsarServiceState.Failed, + "Service 1 should be marked Failed by findFailoverTo, not remain stale Healthy"); + assertEquals(stateArray[2], PulsarServiceState.Healthy, + "Service 2 should remain Healthy"); + }); + } + + /** + * Verifies no spurious recovery bounce occurs after failover. Without the fix, + * the first check cycle after failover to index 2 would see stale Healthy state[1] + * and immediately switch to index 1. + */ + @Test(timeOut = 30000) + public void testNoSpuriousRecoveryBounceAfterFailover() throws Exception { + // url0 is down, url1 is down, url2 is healthy. + setLookupResult(URL0, false); + setLookupResult(URL1, false); + setLookupResult(URL2, true); + + // Pre-set state[0] to Failed. + runOnExecutor(() -> { + stateArray[0] = PulsarServiceState.Failed; + counterArray[0].setValue(0); + }); + + // Failover: 0 -> 2. + runCheckCycle(); + runOnExecutor(() -> assertEquals(failover.getCurrentPulsarServiceIndex(), 2)); + + // Run another check cycle. Without the fix, state[1] would be stale Healthy, + // and firstHealthyPulsarService would return 1, causing a spurious switch. + runCheckCycle(); + runOnExecutor(() -> assertEquals(failover.getCurrentPulsarServiceIndex(), 2, + "Should stay at index 2, not bounce to index 1")); + } + + /** + * Verifies that recovery still works correctly for a service that was marked Failed + * by findFailoverTo, once that service becomes available again. + */ + @Test(timeOut = 30000) + public void testRecoveryAfterFindFailoverToMarksServiceFailed() throws Exception { + // url0 is down, url1 is down, url2 is healthy. + setLookupResult(URL0, false); + setLookupResult(URL1, false); + setLookupResult(URL2, true); + + // Pre-set state[0] to Failed and trigger failover 0 -> 2. + runOnExecutor(() -> { + stateArray[0] = PulsarServiceState.Failed; + counterArray[0].setValue(0); + }); + runCheckCycle(); + runOnExecutor(() -> { + assertEquals(failover.getCurrentPulsarServiceIndex(), 2); + assertEquals(stateArray[1], PulsarServiceState.Failed); + }); + + // Now make url1 healthy (simulating recovery of that service). + setLookupResult(URL1, true); + + // Run check cycles until service 1 recovers. + // Failed -> PreRecover (1 check) -> Healthy (recoverThreshold=2, so 1 more check). + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + runCheckCycle(); + runOnExecutor(() -> { + assertEquals(failover.getCurrentPulsarServiceIndex(), 1, + "Should recover to index 1 after it becomes healthy"); + assertEquals(stateArray[1], PulsarServiceState.Healthy); + }); + }); + } + + private void runOnExecutor(Runnable task) throws Exception { + try { + executor.submit(task).get(5, TimeUnit.SECONDS); + } catch (java.util.concurrent.ExecutionException e) { + // Unwrap so that AssertionErrors propagate directly to Awaitility. + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } + throw e; + } + } + + private void runCheckCycle() throws Exception { + runOnExecutor(() -> { + try { + Method checkMethod = SameAuthParamsLookupAutoClusterFailover.class + .getDeclaredMethod("checkPulsarServices"); + checkMethod.setAccessible(true); + Method firstHealthyMethod = SameAuthParamsLookupAutoClusterFailover.class + .getDeclaredMethod("firstHealthyPulsarService"); + firstHealthyMethod.setAccessible(true); + Method findFailoverMethod = SameAuthParamsLookupAutoClusterFailover.class + .getDeclaredMethod("findFailoverTo"); + findFailoverMethod.setAccessible(true); + Method updateMethod = SameAuthParamsLookupAutoClusterFailover.class + .getDeclaredMethod("updateServiceUrl", int.class); + updateMethod.setAccessible(true); + + checkMethod.invoke(failover); + int firstHealthy = (int) firstHealthyMethod.invoke(failover); + int currentIndex = failover.getCurrentPulsarServiceIndex(); + if (firstHealthy != currentIndex) { + if (firstHealthy < 0) { + int failoverTo = (int) findFailoverMethod.invoke(failover); + if (failoverTo >= 0) { + updateMethod.invoke(failover, failoverTo); + } + } else { + updateMethod.invoke(failover, firstHealthy); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +}
