This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 37c024c1167c62ed204194ab1fca55f871e89ad8 Author: sinan liu <[email protected]> AuthorDate: Sat Oct 18 03:22:32 2025 +0800 [fix][test] Fix flaky SubscriptionSeekTest.testSeekWillNotEncounteredFencedError by counting subscription is fenced only after seek (#24865) (cherry picked from commit 678db6b34587b0042235f0d31e4c406bb0263bcd) --- .../apache/pulsar/broker/service/SubscriptionSeekTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index fb3f0dc4302..a06716d54b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import lombok.Cleanup; @@ -1035,11 +1036,14 @@ public class SubscriptionSeekTest extends BrokerTestBase { // Create a pulsar client with a subscription fenced counter. ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); AtomicInteger receivedFencedErrorCounter = new AtomicInteger(); + // Count switch: Default off, turn on again before seek starts. + final AtomicBoolean countAfterSeek = new AtomicBoolean(false); @Cleanup PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { protected void handleError(CommandError error) { - if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced")) { + if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced") + && countAfterSeek.get()) { receivedFencedErrorCounter.incrementAndGet(); } super.handleError(error); @@ -1076,10 +1080,9 @@ public class SubscriptionSeekTest extends BrokerTestBase { assertNotNull(msg); consumer.acknowledge(msg); } + countAfterSeek.set(true); consumer.seek(msgId1); - Awaitility.await().untilAsserted(() -> { - assertTrue(consumer.isConnected()); - }); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); assertEquals(receivedFencedErrorCounter.get(), 0); // cleanup.
