Anonymitaet commented on code in PR #20807:
URL: https://github.com/apache/pulsar/pull/20807#discussion_r1282691288


##########
pip/pip-281.md:
##########
@@ -0,0 +1,103 @@
+# Title: [io] Add notifyError method on PushSource
+
+## Motivation
+
+In function framework, when 
[source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506)
 method throw exception, it will trigger close function instance. If it is in 
the k8s environment, it will be restarted
+
+On io source connector, we provide 
[PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java)
 class for users to use, and users can extend this class to quickly implement 
the push message model.
+It overrides the `read` method and provides the `consume` method for the user 
to call.
+
+However, if the source connector that extends from the class, 
+it cannot notify the function framework if it encounters an exception while 
consuming data internally, 
+in other words, the function call `source.read()` never triggers an exception 
and never exits the process.
+
+
+## Goals
+
+Add `notifyError` method on PushSource, This method can receive an exception 
and put the exception in the queue. The next time an exception is `read`, will 
throws it.
+```java
+
+  public Record<T> read() throws Exception {
+    Record<T> record = queue.take();
+    if (record instanceof ErrorNotifierRecord) {
+      throw ((ErrorNotifierRecord) record).getException();
+    }
+    return record;
+  }
+
+
+  /**
+   * Allows the source to notify errors asynchronously.
+   * @param ex
+   */
+  public void notifyError(Exception ex) {
+    consume(new ErrorNotifierRecord(ex));
+  }
+}
+```
+
+Just like the implementation of the current 
[BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java)
+
+
+### Compatibility
+
+This PIP is to provide a method for users to use, not an new interface. 
+
+- So it is forward compatible
+- However, connector using this method are not backward compatible.
+For example, the Kafka source connector compiles with version 3.1(include this 
pip) Pulsar dependencies and uses the `notifyError` method, and if it switches 
back to version 3.0(exclude this pip) Pulsar compilation, it will encounter 
compilation errors.

Review Comment:
   Sorry, it's still unclear. What's the relationship between "the Kafka source 
connector compiles with version 3.1(include this pip) Pulsar dependencies and 
uses the `notifyError` method," and the latter sentence?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to