This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a8512e7a03725feb9fc398f3b69f71341c7704d7
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 17 18:33:25 2024 +0200

    [improve][fn] Improve implementation for maxPendingAsyncRequests async 
concurrency limit when return type is CompletableFuture<Void> (#23708)
    
    (cherry picked from commit 8ad67776fc0787fe857d999a72f9df1e95e4210d)
---
 .../functions/instance/JavaExecutionResult.java    |  4 +-
 .../pulsar/functions/instance/JavaInstance.java    | 77 +++++++++++++++++-----
 .../functions/instance/JavaInstanceRunnable.java   |  2 +-
 .../functions/instance/JavaInstanceTest.java       | 64 ++++++++++++++++++
 4 files changed, 127 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
index 7af238154d6..5856600196b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
@@ -27,13 +27,11 @@ import lombok.Data;
  */
 @Data
 public class JavaExecutionResult {
-    private Exception userException;
-    private Exception systemException;
+    private Throwable userException;
     private Object result;
 
     public void reset() {
         setUserException(null);
-        setSystemException(null);
         setResult(null);
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 292f52b5091..5946be9fe5b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.function.Consumer;
 import lombok.AccessLevel;
 import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 
@@ -57,13 +59,26 @@ public class JavaInstance implements AutoCloseable {
     private final ExecutorService executor;
     @Getter
     private final LinkedBlockingQueue<AsyncFuncRequest> pendingAsyncRequests;
+    @Getter
+    private final Semaphore asyncRequestsConcurrencyLimiter;
+    private final boolean asyncPreserveInputOrderForOutputMessages;
 
     public JavaInstance(ContextImpl contextImpl, Object userClassObject, 
InstanceConfig instanceConfig) {
 
         this.context = contextImpl;
         this.instanceConfig = instanceConfig;
         this.executor = Executors.newSingleThreadExecutor();
-        this.pendingAsyncRequests = new 
LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
+
+        asyncPreserveInputOrderForOutputMessages =
+                
resolveAsyncPreserveInputOrderForOutputMessages(instanceConfig);
+
+        if (asyncPreserveInputOrderForOutputMessages) {
+            this.pendingAsyncRequests = new 
LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
+            this.asyncRequestsConcurrencyLimiter = null;
+        } else {
+            this.pendingAsyncRequests = null;
+            this.asyncRequestsConcurrencyLimiter = new 
Semaphore(this.instanceConfig.getMaxPendingAsyncRequests());
+        }
 
         // create the functions
         if (userClassObject instanceof Function) {
@@ -73,6 +88,20 @@ public class JavaInstance implements AutoCloseable {
         }
     }
 
+    // resolve whether to preserve input order for output messages for async 
functions
+    private boolean 
resolveAsyncPreserveInputOrderForOutputMessages(InstanceConfig instanceConfig) {
+        // no need to preserve input order for output messages if the function 
returns Void type
+        boolean voidReturnType = instanceConfig.getFunctionDetails() != null
+                && instanceConfig.getFunctionDetails().getSink() != null
+                && 
Void.class.getName().equals(instanceConfig.getFunctionDetails().getSink().getTypeClassName());
+        if (voidReturnType) {
+            return false;
+        }
+
+        // preserve input order for output messages
+        return true;
+    }
+
     @VisibleForTesting
     public JavaExecutionResult handleMessage(Record<?> record, Object input) {
         return handleMessage(record, input, (rec, result) -> {
@@ -103,15 +132,33 @@ public class JavaInstance implements AutoCloseable {
         }
 
         if (output instanceof CompletableFuture) {
-            // Function is in format: Function<I, CompletableFuture<O>>
-            AsyncFuncRequest request = new AsyncFuncRequest(
-                record, (CompletableFuture) output
-            );
             try {
-                pendingAsyncRequests.put(request);
-                ((CompletableFuture) output).whenCompleteAsync((res, cause) -> 
{
+                if (asyncPreserveInputOrderForOutputMessages) {
+                    // Function is in format: Function<I, CompletableFuture<O>>
+                    AsyncFuncRequest request = new AsyncFuncRequest(
+                            record, (CompletableFuture) output
+                    );
+                    pendingAsyncRequests.put(request);
+                } else {
+                    asyncRequestsConcurrencyLimiter.acquire();
+                }
+                ((CompletableFuture<Object>) output).whenCompleteAsync((Object 
res, Throwable cause) -> {
                     try {
-                        processAsyncResults(asyncResultConsumer);
+                        if (asyncPreserveInputOrderForOutputMessages) {
+                            
processAsyncResultsInInputOrder(asyncResultConsumer);
+                        } else {
+                            try {
+                                JavaExecutionResult execResult = new 
JavaExecutionResult();
+                                if (cause != null) {
+                                    
execResult.setUserException(FutureUtil.unwrapCompletionException(cause));
+                                } else {
+                                    execResult.setResult(res);
+                                }
+                                asyncResultConsumer.accept(record, execResult);
+                            } finally {
+                                asyncRequestsConcurrencyLimiter.release();
+                            }
+                        }
                     } catch (Throwable innerException) {
                         // the thread used for processing async results failed
                         asyncFailureHandler.accept(innerException);
@@ -132,21 +179,20 @@ public class JavaInstance implements AutoCloseable {
         }
     }
 
-    private void processAsyncResults(JavaInstanceRunnable.AsyncResultConsumer 
resultConsumer) throws Exception {
+    // processes the async results in the input order so that the order of the 
result messages in the output topic
+    // are in the same order as the input
+    private void 
processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultConsumer 
resultConsumer)
+            throws Exception {
         AsyncFuncRequest asyncResult = pendingAsyncRequests.peek();
         while (asyncResult != null && asyncResult.getProcessResult().isDone()) 
{
             pendingAsyncRequests.remove(asyncResult);
-            JavaExecutionResult execResult = new JavaExecutionResult();
 
+            JavaExecutionResult execResult = new JavaExecutionResult();
             try {
                 Object result = asyncResult.getProcessResult().get();
                 execResult.setResult(result);
             } catch (ExecutionException e) {
-                if (e.getCause() instanceof Exception) {
-                    execResult.setUserException((Exception) e.getCause());
-                } else {
-                    execResult.setUserException(new Exception(e.getCause()));
-                }
+                
execResult.setUserException(FutureUtil.unwrapCompletionException(e));
             }
 
             resultConsumer.accept(asyncResult.getRecord(), execResult);
@@ -154,7 +200,6 @@ public class JavaInstance implements AutoCloseable {
             // peek the next result
             asyncResult = pendingAsyncRequests.peek();
         }
-
     }
 
     public void initialize() throws Exception {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ffbea9af800..c9b069f8a6f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -389,7 +389,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     @VisibleForTesting
     void handleResult(Record srcRecord, JavaExecutionResult result) throws 
Exception {
         if (result.getUserException() != null) {
-            Exception t = result.getUserException();
+            Throwable t = result.getUserException();
             log.warn("Encountered exception when processing message {}",
                     srcRecord, t);
             stats.incrUserExceptions(t);
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 5a333204293..b3fcef292e5 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -23,10 +23,13 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Function;
@@ -245,4 +248,65 @@ public class JavaInstanceTest {
                super(msg);
        }
     }
+
+    @Test
+    public void testAsyncFunctionMaxPendingVoidResult() throws Exception {
+        CountDownLatch count = new CountDownLatch(1);
+        InstanceConfig instanceConfig = new InstanceConfig();
+        
instanceConfig.setFunctionDetails(org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder()
+                
.setSink(org.apache.pulsar.functions.proto.Function.SinkSpec.newBuilder()
+                        .setTypeClassName(Void.class.getName())
+                        .build())
+                .build());
+        int pendingQueueSize = 3;
+        instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize);
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        Function<String, CompletableFuture<Void>> function = (input, context) 
-> {
+            CompletableFuture<Void> result  = new CompletableFuture<>();
+            executor.submit(() -> {
+                try {
+                    count.await();
+                    result.complete(null);
+                } catch (Exception e) {
+                    result.completeExceptionally(e);
+                }
+            });
+
+            return result;
+        };
+
+        JavaInstance instance = new JavaInstance(
+                mock(ContextImpl.class),
+                function,
+                instanceConfig);
+        String testString = "ABC123";
+
+        CountDownLatch resultsLatch = new CountDownLatch(3);
+
+        long startTime = System.currentTimeMillis();
+        assertEquals(pendingQueueSize, 
instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
+        JavaInstanceRunnable.AsyncResultConsumer asyncResultConsumer = (rec, 
result) -> {
+            resultsLatch.countDown();
+        };
+        Consumer<Throwable> asyncFailureHandler = cause -> {
+        };
+        assertNull(instance.handleMessage(mock(Record.class), testString, 
asyncResultConsumer, asyncFailureHandler));
+        assertEquals(pendingQueueSize - 1, 
instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
+        assertNull(instance.handleMessage(mock(Record.class), testString, 
asyncResultConsumer, asyncFailureHandler));
+        assertEquals(pendingQueueSize - 2, 
instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
+        assertNull(instance.handleMessage(mock(Record.class), testString, 
asyncResultConsumer, asyncFailureHandler));
+        // no space left
+        assertEquals(0, 
instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
+
+        count.countDown();
+
+        assertTrue(resultsLatch.await(5, TimeUnit.SECONDS));
+
+        long endTime = System.currentTimeMillis();
+
+        log.info("start:{} end:{} during:{}", startTime, endTime, endTime - 
startTime);
+        instance.close();
+    }
 }

Reply via email to