This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new a74c0599e04 [fix][test] Fix test testAsyncFunctionMaxPending (#22121)
a74c0599e04 is described below
commit a74c0599e04bdbeb9e306159a9765069fda97e97
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Feb 27 22:22:10 2024 +0800
[fix][test] Fix test testAsyncFunctionMaxPending (#22121)
---
.../apache/pulsar/functions/instance/JavaInstanceTest.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index efe80922dfa..5a333204293 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Cleanup;
@@ -185,6 +186,7 @@ public class JavaInstanceTest {
@Test
public void testAsyncFunctionMaxPending() throws Exception {
+ CountDownLatch count = new CountDownLatch(1);
InstanceConfig instanceConfig = new InstanceConfig();
int pendingQueueSize = 3;
instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize);
@@ -196,7 +198,7 @@ public class JavaInstanceTest {
CompletableFuture<String> result = new CompletableFuture<>();
executor.submit(() -> {
try {
- Thread.sleep(500);
+ count.await();
result.complete(String.format("%s-lambda", input));
} catch (Exception e) {
result.completeExceptionally(e);
@@ -222,8 +224,13 @@ public class JavaInstanceTest {
// no space left
assertEquals(0,
instance.getPendingAsyncRequests().remainingCapacity());
+ AsyncFuncRequest[] asyncFuncRequests = new AsyncFuncRequest[3];
for (int i = 0; i < 3; i++) {
- AsyncFuncRequest request =
instance.getPendingAsyncRequests().poll();
+ asyncFuncRequests[i] = instance.getPendingAsyncRequests().poll();
+ }
+
+ count.countDown();
+ for (AsyncFuncRequest request : asyncFuncRequests) {
Assert.assertEquals(request.getProcessResult().get(), testString +
"-lambda");
}