RobertIndie commented on code in PR #21143:
URL: https://github.com/apache/pulsar/pull/21143#discussion_r1324390010
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -340,16 +345,24 @@ public void run() {
// process the synchronous results
handleResult(currentRecord, result);
}
+
+ if (deathException != null) {
Review Comment:
If a fatal exception is thrown from source.read or sink.write, the thread
can be interrupted, preventing it from reaching this point.
This handles the corner case for the `function.process`. I have added the
comment here to minimize the confusion.
```
// Ideally the current java instance thread will be interrupted when the
deathException is set.
// But if the CompletableFuture returned by the Pulsar Function is completed
exceptionally(the
// function has invoked the fatal method) before being put into the
JavaInstance
// .pendingAsyncRequests, the interrupted exception may be thrown when
putting this future to
// JavaInstance.pendingAsyncRequests. The interrupted exception would be
caught by the JavaInstance
// and be skipped.
// Therefore, we need to handle this case by checking the deathException
here and rethrow it.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]