shibd commented on code in PR #21143:
URL: https://github.com/apache/pulsar/pull/21143#discussion_r1324239986


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -340,16 +345,24 @@ public void run() {
                     // process the synchronous results
                     handleResult(currentRecord, result);
                 }
+
+                if (deathException != null) {
+                    throw deathException;
+                }
             }
         } catch (Throwable t) {
-            log.error("[{}] Uncaught exception in Java Instance", 
FunctionCommon.getFullyQualifiedInstanceId(
-                    instanceConfig.getFunctionDetails().getTenant(),
-                    instanceConfig.getFunctionDetails().getNamespace(),
-                    instanceConfig.getFunctionDetails().getName(),
-                    instanceConfig.getInstanceId()), t);
-            deathException = t;
+            if (deathException != null) {
+                log.info("Fatal exception occurred in the instance", 
deathException);

Review Comment:
   We should print the error level log and add the instance ID 
(getFullyQualifiedInstanceId). 
   
   I think it is possible to reuse L357~362.



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -340,16 +345,24 @@ public void run() {
                     // process the synchronous results
                     handleResult(currentRecord, result);
                 }
+
+                if (deathException != null) {

Review Comment:
   We need to judge `deathException != null` in three places
   1. When `source.read` just finished.
   2. When `function.process` just finished.
   3. When `sink.write` just finished.
   
   This can catch the exception in advance and end the thread.



##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java:
##########
@@ -217,4 +217,10 @@ default ClientBuilder getPulsarClientBuilder() {
         throw new UnsupportedOperationException("not implemented");
     }
 
+    /**
+     * Terminate the function instance with a fatal exception.
+     *
+     * @param t the fatal exception to be raised
+     */
+    void fatal(Throwable t);

Review Comment:
   When the function/connector calls this method, we don't break user threads 
to continue executing. Is this reasonable?
   
   For examples: 
   
   ```java
   
       public static class TestSinkConnector implements Sink<String> {
           SinkContext context;
   
           @Override
           public void open(Map config, SinkContext sinkContext) throws 
Exception {
               this.context = sinkContext;
           }
   
           @Override
           public void write(Record<String> record) throws Exception {
               new Thread(() -> {
                   if 
(FailComponentType.FAIL_SINK.toString().equals(record.getValue())) {
                       context.fatal(new 
Exception(FailComponentType.FAIL_SINK.toString()));
                   }
                   // continue to do other things.
               }).start();
           }
   
           @Override
           public void close() throws Exception {
   
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to