This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6b94d335fa260c03f6c407f11b47c032d0221a7e Author: sinan liu <[email protected]> AuthorDate: Mon Feb 9 17:06:17 2026 +0800 [fix][test] Fix Mockito stubbing race in TopicListServiceTest (#25227) (cherry picked from commit c93dd7ad8705fa6ef9019ae796892ad2d2177b61) --- .../broker/service/TopicListServiceTest.java | 26 ++++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java index 5caa7712891..ec566a4734c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java @@ -336,6 +336,20 @@ public class TopicListServiceTest { @Test public void testCommandWatchUpdateRetries() { + AtomicInteger failureCount = new AtomicInteger(0); + // Set up the stubbing before starting async work to avoid races with Mockito stubbing state. + doAnswer(invocationOnMock -> { + List<String> newTopicsArg = invocationOnMock.getArgument(1); + if (!newTopicsArg.isEmpty() && failureCount.incrementAndGet() < 3) { + Throwable failure = new AsyncSemaphore.PermitAcquireTimeoutException("Acquire timed out"); + Function<Throwable, CompletableFuture<Void>> permitAcquireErrorHandler = + invocationOnMock.getArgument(4); + return permitAcquireErrorHandler.apply(failure); + } else { + return CompletableFuture.completedFuture(null); + } + }).when(pulsarCommandSender).sendWatchTopicListUpdate(anyLong(), any(), any(), anyString(), any()); + topicListService.handleWatchTopicList( NamespaceName.get("tenant/ns"), 13, @@ -349,18 +363,6 @@ public class TopicListServiceTest { List<String> newTopics = Collections.singletonList("persistent://tenant/ns/topic2"); String hash = TopicList.calculateHash(ListUtils.union(topics, newTopics)); - AtomicInteger failureCount = new AtomicInteger(0); - doAnswer(invocationOnMock -> { - List<String> newTopicsArg = invocationOnMock.getArgument(1); - if (!newTopicsArg.isEmpty() && failureCount.incrementAndGet() < 3) { - Throwable failure = new AsyncSemaphore.PermitAcquireTimeoutException("Acquire timed out"); - Function<Throwable, CompletableFuture<Void>> permitAcquireErrorHandler = - invocationOnMock.getArgument(4); - return permitAcquireErrorHandler.apply(failure); - } else { - return CompletableFuture.completedFuture(null); - } - }).when(pulsarCommandSender).sendWatchTopicListUpdate(anyLong(), any(), any(), anyString(), any()); notificationConsumer.accept( new Notification(NotificationType.Created, "/managed-ledgers/tenant/ns/persistent/topic2")); notificationConsumer.accept(
