This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 789b9ccf7a0afdf1d9fdba1bc197f36476aa687c Author: Lari Hotari <[email protected]> AuthorDate: Tue Dec 17 18:33:25 2024 +0200 [improve][fn] Improve implementation for maxPendingAsyncRequests async concurrency limit when return type is CompletableFuture<Void> (#23708) (cherry picked from commit 8ad67776fc0787fe857d999a72f9df1e95e4210d) --- .../functions/instance/JavaExecutionResult.java | 4 +- .../pulsar/functions/instance/JavaInstance.java | 77 +++++++++++++++++----- .../functions/instance/JavaInstanceRunnable.java | 2 +- .../functions/instance/JavaInstanceTest.java | 64 ++++++++++++++++++ 4 files changed, 127 insertions(+), 20 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java index 7af238154d6..5856600196b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java @@ -27,13 +27,11 @@ import lombok.Data; */ @Data public class JavaExecutionResult { - private Exception userException; - private Exception systemException; + private Throwable userException; private Object result; public void reset() { setUserException(null); - setSystemException(null); setResult(null); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 292f52b5091..5946be9fe5b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -25,11 +25,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.function.Consumer; import lombok.AccessLevel; import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; @@ -57,13 +59,26 @@ public class JavaInstance implements AutoCloseable { private final ExecutorService executor; @Getter private final LinkedBlockingQueue<AsyncFuncRequest> pendingAsyncRequests; + @Getter + private final Semaphore asyncRequestsConcurrencyLimiter; + private final boolean asyncPreserveInputOrderForOutputMessages; public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) { this.context = contextImpl; this.instanceConfig = instanceConfig; this.executor = Executors.newSingleThreadExecutor(); - this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests()); + + asyncPreserveInputOrderForOutputMessages = + resolveAsyncPreserveInputOrderForOutputMessages(instanceConfig); + + if (asyncPreserveInputOrderForOutputMessages) { + this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests()); + this.asyncRequestsConcurrencyLimiter = null; + } else { + this.pendingAsyncRequests = null; + this.asyncRequestsConcurrencyLimiter = new Semaphore(this.instanceConfig.getMaxPendingAsyncRequests()); + } // create the functions if (userClassObject instanceof Function) { @@ -73,6 +88,20 @@ public class JavaInstance implements AutoCloseable { } } + // resolve whether to preserve input order for output messages for async functions + private boolean resolveAsyncPreserveInputOrderForOutputMessages(InstanceConfig instanceConfig) { + // no need to preserve input order for output messages if the function returns Void type + boolean voidReturnType = instanceConfig.getFunctionDetails() != null + && instanceConfig.getFunctionDetails().getSink() != null + && Void.class.getName().equals(instanceConfig.getFunctionDetails().getSink().getTypeClassName()); + if (voidReturnType) { + return false; + } + + // preserve input order for output messages + return true; + } + @VisibleForTesting public JavaExecutionResult handleMessage(Record<?> record, Object input) { return handleMessage(record, input, (rec, result) -> { @@ -103,15 +132,33 @@ public class JavaInstance implements AutoCloseable { } if (output instanceof CompletableFuture) { - // Function is in format: Function<I, CompletableFuture<O>> - AsyncFuncRequest request = new AsyncFuncRequest( - record, (CompletableFuture) output - ); try { - pendingAsyncRequests.put(request); - ((CompletableFuture) output).whenCompleteAsync((res, cause) -> { + if (asyncPreserveInputOrderForOutputMessages) { + // Function is in format: Function<I, CompletableFuture<O>> + AsyncFuncRequest request = new AsyncFuncRequest( + record, (CompletableFuture) output + ); + pendingAsyncRequests.put(request); + } else { + asyncRequestsConcurrencyLimiter.acquire(); + } + ((CompletableFuture<Object>) output).whenCompleteAsync((Object res, Throwable cause) -> { try { - processAsyncResults(asyncResultConsumer); + if (asyncPreserveInputOrderForOutputMessages) { + processAsyncResultsInInputOrder(asyncResultConsumer); + } else { + try { + JavaExecutionResult execResult = new JavaExecutionResult(); + if (cause != null) { + execResult.setUserException(FutureUtil.unwrapCompletionException(cause)); + } else { + execResult.setResult(res); + } + asyncResultConsumer.accept(record, execResult); + } finally { + asyncRequestsConcurrencyLimiter.release(); + } + } } catch (Throwable innerException) { // the thread used for processing async results failed asyncFailureHandler.accept(innerException); @@ -132,21 +179,20 @@ public class JavaInstance implements AutoCloseable { } } - private void processAsyncResults(JavaInstanceRunnable.AsyncResultConsumer resultConsumer) throws Exception { + // processes the async results in the input order so that the order of the result messages in the output topic + // are in the same order as the input + private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultConsumer resultConsumer) + throws Exception { AsyncFuncRequest asyncResult = pendingAsyncRequests.peek(); while (asyncResult != null && asyncResult.getProcessResult().isDone()) { pendingAsyncRequests.remove(asyncResult); - JavaExecutionResult execResult = new JavaExecutionResult(); + JavaExecutionResult execResult = new JavaExecutionResult(); try { Object result = asyncResult.getProcessResult().get(); execResult.setResult(result); } catch (ExecutionException e) { - if (e.getCause() instanceof Exception) { - execResult.setUserException((Exception) e.getCause()); - } else { - execResult.setUserException(new Exception(e.getCause())); - } + execResult.setUserException(FutureUtil.unwrapCompletionException(e)); } resultConsumer.accept(asyncResult.getRecord(), execResult); @@ -154,7 +200,6 @@ public class JavaInstance implements AutoCloseable { // peek the next result asyncResult = pendingAsyncRequests.peek(); } - } public void initialize() throws Exception { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index baf0c5f7400..4f811c14704 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -423,7 +423,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @VisibleForTesting void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception { if (result.getUserException() != null) { - Exception t = result.getUserException(); + Throwable t = result.getUserException(); log.warn("Encountered exception when processing message {}", srcRecord, t); stats.incrUserExceptions(t); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index 5a333204293..b3fcef292e5 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -23,10 +23,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Function; @@ -245,4 +248,65 @@ public class JavaInstanceTest { super(msg); } } + + @Test + public void testAsyncFunctionMaxPendingVoidResult() throws Exception { + CountDownLatch count = new CountDownLatch(1); + InstanceConfig instanceConfig = new InstanceConfig(); + instanceConfig.setFunctionDetails(org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder() + .setSink(org.apache.pulsar.functions.proto.Function.SinkSpec.newBuilder() + .setTypeClassName(Void.class.getName()) + .build()) + .build()); + int pendingQueueSize = 3; + instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize); + @Cleanup("shutdownNow") + ExecutorService executor = Executors.newCachedThreadPool(); + + Function<String, CompletableFuture<Void>> function = (input, context) -> { + CompletableFuture<Void> result = new CompletableFuture<>(); + executor.submit(() -> { + try { + count.await(); + result.complete(null); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + + return result; + }; + + JavaInstance instance = new JavaInstance( + mock(ContextImpl.class), + function, + instanceConfig); + String testString = "ABC123"; + + CountDownLatch resultsLatch = new CountDownLatch(3); + + long startTime = System.currentTimeMillis(); + assertEquals(pendingQueueSize, instance.getAsyncRequestsConcurrencyLimiter().availablePermits()); + JavaInstanceRunnable.AsyncResultConsumer asyncResultConsumer = (rec, result) -> { + resultsLatch.countDown(); + }; + Consumer<Throwable> asyncFailureHandler = cause -> { + }; + assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler)); + assertEquals(pendingQueueSize - 1, instance.getAsyncRequestsConcurrencyLimiter().availablePermits()); + assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler)); + assertEquals(pendingQueueSize - 2, instance.getAsyncRequestsConcurrencyLimiter().availablePermits()); + assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler)); + // no space left + assertEquals(0, instance.getAsyncRequestsConcurrencyLimiter().availablePermits()); + + count.countDown(); + + assertTrue(resultsLatch.await(5, TimeUnit.SECONDS)); + + long endTime = System.currentTimeMillis(); + + log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime); + instance.close(); + } }
