This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 387ea92c330 [fix][test] Stabilize
FunctionAssignmentTailerTest.testErrorNotifier by synchronizing mock stubbing
with CountDownLatch (#24875)
387ea92c330 is described below
commit 387ea92c3303f805a6d3d7a4580dd0c69278cb07
Author: sinan liu <[email protected]>
AuthorDate: Mon Oct 27 20:22:48 2025 +0800
[fix][test] Stabilize FunctionAssignmentTailerTest.testErrorNotifier by
synchronizing mock stubbing with CountDownLatch (#24875)
---
.../functions/worker/FunctionAssignmentTailerTest.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
index c78c68f8923..14bd0cde268 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -36,6 +37,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -148,6 +150,17 @@ public class FunctionAssignmentTailerTest {
// test new assignment add functions
FunctionRuntimeManager functionRuntimeManager =
mock(FunctionRuntimeManager.class);
+ // Use CountDownLatch to park the background thread after first
processing and before re-stubbing
+ CountDownLatch firstProcessed = new
java.util.concurrent.CountDownLatch(1);
+ CountDownLatch release = new java.util.concurrent.CountDownLatch(1);
+
+ // On first processing of message1, block and wait for re-stubbing
+ doAnswer(inv -> {
+ firstProcessed.countDown();
+ release.await(5, TimeUnit.SECONDS);
+ return null;
+ }).when(functionRuntimeManager).processAssignmentMessage(eq(message1));
+
FunctionAssignmentTailer functionAssignmentTailer =
spy(new FunctionAssignmentTailer(functionRuntimeManager,
readerBuilder, workerConfig, errorNotifier));
@@ -157,12 +170,17 @@ public class FunctionAssignmentTailerTest {
verify(errorNotifier, times(0)).triggerError(any());
messageList.add(message1);
+ Assert.assertTrue(firstProcessed.await(5, TimeUnit.SECONDS),
+ "First processing did not reach the blocking point");
verify(errorNotifier, times(0)).triggerError(any());
// trigger an error to be thrown
doThrow(new
RuntimeException("test")).when(functionRuntimeManager).processAssignmentMessage(any());
+ // Release the first processing
+ release.countDown();
+
messageList.add(message2);
try {