RobertIndie commented on code in PR #21143:
URL: https://github.com/apache/pulsar/pull/21143#discussion_r1324390010


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -340,16 +345,24 @@ public void run() {
                     // process the synchronous results
                     handleResult(currentRecord, result);
                 }
+
+                if (deathException != null) {

Review Comment:
   If a fatal exception is thrown from source.read or sink.write, the thread 
can be interrupted, preventing it from reaching this point.
   This handles the corner case for the `function.process`. I have added the 
comment here to minimize the confusion.
   
   ```
   // Ideally the current java instance thread will be interrupted when the 
deathException is set.
   // But if the CompletableFuture returned by the Pulsar Function is completed 
exceptionally(the
   // function has invoked the fatal method) before being put into the 
JavaInstance
   // .pendingAsyncRequests, the interrupted exception may be thrown when 
putting this future to
   // JavaInstance.pendingAsyncRequests. The interrupted exception would be 
caught by the JavaInstance
   // and be skipped.
   // Therefore, we need to handle this case by checking the deathException 
here and rethrow it.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to