shibd commented on code in PR #21143:
URL: https://github.com/apache/pulsar/pull/21143#discussion_r1324239986
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -340,16 +345,24 @@ public void run() {
// process the synchronous results
handleResult(currentRecord, result);
}
+
+ if (deathException != null) {
+ throw deathException;
+ }
}
} catch (Throwable t) {
- log.error("[{}] Uncaught exception in Java Instance",
FunctionCommon.getFullyQualifiedInstanceId(
- instanceConfig.getFunctionDetails().getTenant(),
- instanceConfig.getFunctionDetails().getNamespace(),
- instanceConfig.getFunctionDetails().getName(),
- instanceConfig.getInstanceId()), t);
- deathException = t;
+ if (deathException != null) {
+ log.info("Fatal exception occurred in the instance",
deathException);
Review Comment:
We should print the error level log and add the instance ID
(getFullyQualifiedInstanceId).
I think it is possible to reuse L357~362.
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -340,16 +345,24 @@ public void run() {
// process the synchronous results
handleResult(currentRecord, result);
}
+
+ if (deathException != null) {
Review Comment:
We need to judge `deathException != null` in three places
1. When `source.read` just finished.
2. When `function.process` just finished.
3. When `sink.write` just finished.
This can catch the exception in advance and end the thread.
##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java:
##########
@@ -217,4 +217,10 @@ default ClientBuilder getPulsarClientBuilder() {
throw new UnsupportedOperationException("not implemented");
}
+ /**
+ * Terminate the function instance with a fatal exception.
+ *
+ * @param t the fatal exception to be raised
+ */
+ void fatal(Throwable t);
Review Comment:
When the function/connector calls this method, we don't break user threads
to continue executing. Is this reasonable?
For examples:
```java
public static class TestSinkConnector implements Sink<String> {
SinkContext context;
@Override
public void open(Map config, SinkContext sinkContext) throws
Exception {
this.context = sinkContext;
}
@Override
public void write(Record<String> record) throws Exception {
new Thread(() -> {
if
(FailComponentType.FAIL_SINK.toString().equals(record.getValue())) {
context.fatal(new
Exception(FailComponentType.FAIL_SINK.toString()));
}
// continue to do other things.
}).start();
}
@Override
public void close() throws Exception {
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]