eolivelli commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r618703404



##########
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
##########
@@ -65,64 +66,96 @@ public JavaInstance(ContextImpl contextImpl, Object 
userClassObject, InstanceCon
             this.javaUtilFunction = (java.util.function.Function) 
userClassObject;
         }
     }
-
-    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> 
record, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(record);
-        }
-
-        final CompletableFuture<JavaExecutionResult> future = new 
CompletableFuture<>();
-        JavaExecutionResult executionResult = new JavaExecutionResult();
-
-        final Object output;
-
-        try {
-            if (function != null) {
-                output = function.process(input, context);
-            } else {
-                output = javaUtilFunction.apply(input);
-            }
+    
+    /**
+     * Invokes the function code against the given input data.
+     * 
+     * @param input - The input data provided to the Function code
+     * @return An ExecutionResult object that contains the function result 
along with any user exceptions
+     * that occurred when executing the Function code.
+     */
+    @SuppressWarnings("unchecked")
+       private JavaExecutionResult executeFunction(Object input) {
+       JavaExecutionResult executionResult = new JavaExecutionResult();
+       
+        try {  
+               final Object result = (function != null) ? 
function.process(input, context) :
+                       javaUtilFunction.apply(input);
+               
+            executionResult.setResult(result);  
         } catch (Exception ex) {
             executionResult.setUserException(ex);
-            future.complete(executionResult);
-            return future;
+        } 
+       
+       return executionResult;
+    }
+    
+    /**
+     * Used to handle asynchronous function requests.
+     * 
+     * @param future - The CompleteableFuture returned from the async function 
call.
+     * @param executionResult 
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+       private void handleAsync(final CompletableFuture future, 
JavaExecutionResult executionResult) {
+       try {
+            pendingAsyncRequests.put(future);
+        } catch (InterruptedException ie) {
+            log.warn("Exception while put Async requests", ie);
+            executionResult.setUserException(ie);
+            Thread.currentThread().interrupt();
         }
 
-        if (output instanceof CompletableFuture) {
-            // Function is in format: Function<I, CompletableFuture<O>>
-            try {
-                pendingAsyncRequests.put((CompletableFuture) output);
-            } catch (InterruptedException ie) {
-                log.warn("Exception while put Async requests", ie);
-                executionResult.setUserException(ie);
-                future.complete(executionResult);
-                return future;
+        future.whenCompleteAsync((functionResult, throwable) -> {
+            if (log.isDebugEnabled()) {
+              log.debug("Got result async: object: {}, throwable: {}", 
functionResult, throwable);
+            }
+            
+            if (throwable != null) {
+              executionResult.setUserException(new 
Exception((Throwable)throwable));      
             }
+          
+            pendingAsyncRequests.remove(future);
+            future.complete(functionResult);
+            
+        }, executor);
+        
+        
+               try {
+                       executionResult.setResult(future.get());
+               } catch (InterruptedException iEx) {
+                       Thread.currentThread().interrupt();

Review comment:
       I am not sure about this case.
   
   should we call `executionResult.setUserException(iEx);` ?
   
   in theory it is not user code that threw this exeception, do we have a way 
to report it as System Exception ? 
   btw we should "break" and throw an error or at least log it

##########
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
##########
@@ -65,64 +66,96 @@ public JavaInstance(ContextImpl contextImpl, Object 
userClassObject, InstanceCon
             this.javaUtilFunction = (java.util.function.Function) 
userClassObject;
         }
     }
-
-    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> 
record, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(record);
-        }
-
-        final CompletableFuture<JavaExecutionResult> future = new 
CompletableFuture<>();
-        JavaExecutionResult executionResult = new JavaExecutionResult();
-
-        final Object output;
-
-        try {
-            if (function != null) {
-                output = function.process(input, context);
-            } else {
-                output = javaUtilFunction.apply(input);
-            }
+    
+    /**
+     * Invokes the function code against the given input data.
+     * 
+     * @param input - The input data provided to the Function code
+     * @return An ExecutionResult object that contains the function result 
along with any user exceptions
+     * that occurred when executing the Function code.
+     */
+    @SuppressWarnings("unchecked")
+       private JavaExecutionResult executeFunction(Object input) {
+       JavaExecutionResult executionResult = new JavaExecutionResult();
+       
+        try {  
+               final Object result = (function != null) ? 
function.process(input, context) :
+                       javaUtilFunction.apply(input);
+               
+            executionResult.setResult(result);  
         } catch (Exception ex) {
             executionResult.setUserException(ex);
-            future.complete(executionResult);
-            return future;
+        } 
+       
+       return executionResult;
+    }
+    
+    /**
+     * Used to handle asynchronous function requests.
+     * 
+     * @param future - The CompleteableFuture returned from the async function 
call.
+     * @param executionResult 
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+       private void handleAsync(final CompletableFuture future, 
JavaExecutionResult executionResult) {
+       try {
+            pendingAsyncRequests.put(future);
+        } catch (InterruptedException ie) {
+            log.warn("Exception while put Async requests", ie);
+            executionResult.setUserException(ie);
+            Thread.currentThread().interrupt();
         }
 
-        if (output instanceof CompletableFuture) {
-            // Function is in format: Function<I, CompletableFuture<O>>
-            try {
-                pendingAsyncRequests.put((CompletableFuture) output);
-            } catch (InterruptedException ie) {
-                log.warn("Exception while put Async requests", ie);
-                executionResult.setUserException(ie);
-                future.complete(executionResult);
-                return future;
+        future.whenCompleteAsync((functionResult, throwable) -> {
+            if (log.isDebugEnabled()) {
+              log.debug("Got result async: object: {}, throwable: {}", 
functionResult, throwable);
+            }
+            
+            if (throwable != null) {
+              executionResult.setUserException(new 
Exception((Throwable)throwable));      
             }
+          
+            pendingAsyncRequests.remove(future);
+            future.complete(functionResult);
+            
+        }, executor);
+        
+        
+               try {
+                       executionResult.setResult(future.get());
+               } catch (InterruptedException iEx) {
+                       Thread.currentThread().interrupt();
+               } catch (ExecutionException eEx) {
+                       executionResult.setUserException(eEx);

Review comment:
       `eEx.getCause()` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to