This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 7140420cf42 [fix][test] Fix flaky
SubscriptionSeekTest.testSeekWillNotEncounteredFencedError by counting
subscription is fenced only after seek (#24865)
7140420cf42 is described below
commit 7140420cf4277d5818a7f87d11d7ee2cdee4a254
Author: sinan liu <[email protected]>
AuthorDate: Sat Oct 18 03:22:32 2025 +0800
[fix][test] Fix flaky
SubscriptionSeekTest.testSeekWillNotEncounteredFencedError by counting
subscription is fenced only after seek (#24865)
---
.../apache/pulsar/broker/service/SubscriptionSeekTest.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index ffdb8610d58..ec16cc7d9eb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -38,6 +38,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Cleanup;
@@ -1046,11 +1047,14 @@ public class SubscriptionSeekTest extends
BrokerTestBase {
// Create a pulsar client with a subscription fenced counter.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl)
PulsarClient.builder().serviceUrl(lookupUrl.toString());
AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
+ // Count switch: Default off, turn on again before seek starts.
+ final AtomicBoolean countAfterSeek = new AtomicBoolean(false);
@Cleanup
PulsarClient client =
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
protected void handleError(CommandError error) {
- if (error.getMessage() != null &&
error.getMessage().contains("Subscription is fenced")) {
+ if (error.getMessage() != null &&
error.getMessage().contains("Subscription is fenced")
+ && countAfterSeek.get()) {
receivedFencedErrorCounter.incrementAndGet();
}
super.handleError(error);
@@ -1087,10 +1091,9 @@ public class SubscriptionSeekTest extends BrokerTestBase
{
assertNotNull(msg);
consumer.acknowledge(msg);
}
+ countAfterSeek.set(true);
consumer.seek(msgId1);
- Awaitility.await().untilAsserted(() -> {
- assertTrue(consumer.isConnected());
- });
+ Awaitility.await().untilAsserted(() ->
assertTrue(consumer.isConnected()));
assertEquals(receivedFencedErrorCounter.get(), 0);
// cleanup.