This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6b94d335fa260c03f6c407f11b47c032d0221a7e
Author: sinan liu <[email protected]>
AuthorDate: Mon Feb 9 17:06:17 2026 +0800

    [fix][test] Fix Mockito stubbing race in TopicListServiceTest (#25227)
    
    (cherry picked from commit c93dd7ad8705fa6ef9019ae796892ad2d2177b61)
---
 .../broker/service/TopicListServiceTest.java       | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
index 5caa7712891..ec566a4734c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
@@ -336,6 +336,20 @@ public class TopicListServiceTest {
 
     @Test
     public void testCommandWatchUpdateRetries() {
+        AtomicInteger failureCount = new AtomicInteger(0);
+        // Set up the stubbing before starting async work to avoid races with 
Mockito stubbing state.
+        doAnswer(invocationOnMock -> {
+            List<String> newTopicsArg = invocationOnMock.getArgument(1);
+            if (!newTopicsArg.isEmpty() && failureCount.incrementAndGet() < 3) 
{
+                Throwable failure = new 
AsyncSemaphore.PermitAcquireTimeoutException("Acquire timed out");
+                Function<Throwable, CompletableFuture<Void>> 
permitAcquireErrorHandler =
+                        invocationOnMock.getArgument(4);
+                return permitAcquireErrorHandler.apply(failure);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).when(pulsarCommandSender).sendWatchTopicListUpdate(anyLong(), 
any(), any(), anyString(), any());
+
         topicListService.handleWatchTopicList(
                 NamespaceName.get("tenant/ns"),
                 13,
@@ -349,18 +363,6 @@ public class TopicListServiceTest {
 
         List<String> newTopics = 
Collections.singletonList("persistent://tenant/ns/topic2");
         String hash = TopicList.calculateHash(ListUtils.union(topics, 
newTopics));
-        AtomicInteger failureCount = new AtomicInteger(0);
-        doAnswer(invocationOnMock -> {
-            List<String> newTopicsArg = invocationOnMock.getArgument(1);
-            if (!newTopicsArg.isEmpty() && failureCount.incrementAndGet() < 3) 
{
-                Throwable failure = new 
AsyncSemaphore.PermitAcquireTimeoutException("Acquire timed out");
-                Function<Throwable, CompletableFuture<Void>> 
permitAcquireErrorHandler =
-                        invocationOnMock.getArgument(4);
-                return permitAcquireErrorHandler.apply(failure);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).when(pulsarCommandSender).sendWatchTopicListUpdate(anyLong(), 
any(), any(), anyString(), any());
         notificationConsumer.accept(
                 new Notification(NotificationType.Created, 
"/managed-ledgers/tenant/ns/persistent/topic2"));
         notificationConsumer.accept(

Reply via email to