This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 387ea92c330 [fix][test] Stabilize 
FunctionAssignmentTailerTest.testErrorNotifier by synchronizing mock stubbing 
with CountDownLatch (#24875)
387ea92c330 is described below

commit 387ea92c3303f805a6d3d7a4580dd0c69278cb07
Author: sinan liu <[email protected]>
AuthorDate: Mon Oct 27 20:22:48 2025 +0800

    [fix][test] Stabilize FunctionAssignmentTailerTest.testErrorNotifier by 
synchronizing mock stubbing with CountDownLatch (#24875)
---
 .../functions/worker/FunctionAssignmentTailerTest.java | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
index c78c68f8923..14bd0cde268 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -36,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -148,6 +150,17 @@ public class FunctionAssignmentTailerTest {
         // test new assignment add functions
         FunctionRuntimeManager functionRuntimeManager = 
mock(FunctionRuntimeManager.class);
 
+        // Use CountDownLatch to park the background thread after first 
processing and before re-stubbing
+        CountDownLatch firstProcessed = new 
java.util.concurrent.CountDownLatch(1);
+        CountDownLatch release = new java.util.concurrent.CountDownLatch(1);
+
+        // On first processing of message1, block and wait for re-stubbing
+        doAnswer(inv -> {
+            firstProcessed.countDown();
+            release.await(5, TimeUnit.SECONDS);
+            return null;
+        }).when(functionRuntimeManager).processAssignmentMessage(eq(message1));
+
         FunctionAssignmentTailer functionAssignmentTailer =
                 spy(new FunctionAssignmentTailer(functionRuntimeManager, 
readerBuilder, workerConfig, errorNotifier));
 
@@ -157,12 +170,17 @@ public class FunctionAssignmentTailerTest {
         verify(errorNotifier, times(0)).triggerError(any());
 
         messageList.add(message1);
+        Assert.assertTrue(firstProcessed.await(5, TimeUnit.SECONDS),
+                "First processing did not reach the blocking point");
 
         verify(errorNotifier, times(0)).triggerError(any());
 
         // trigger an error to be thrown
         doThrow(new 
RuntimeException("test")).when(functionRuntimeManager).processAssignmentMessage(any());
 
+        // Release the first processing
+        release.countDown();
+
         messageList.add(message2);
 
         try {

Reply via email to