shibd commented on code in PR #20807: URL: https://github.com/apache/pulsar/pull/20807#discussion_r1264803961
########## pip/pip-281.md: ########## @@ -0,0 +1,109 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted + +On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector that extends from the class, +it cannot notify the function framework if it encounters an exception while consuming data internally, +in other words, the function call `source.read()` never triggers an exception and never exits the process. + + +## Goals + +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws it. +```java + + public Record<T> read() throws Exception { + Record<T> record = queue.take(); + if (record instanceof ErrorNotifierRecord) { + throw ((ErrorNotifierRecord) record).getException(); + } + return record; + } + + + /** + * Allows the source to notify errors asynchronously. + * @param ex + */ + public void notifyError(Exception ex) { + consume(new ErrorNotifierRecord(ex)); + } +} +``` + +Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) + + +BTW: This is a very simple change and is forward compatible. Sorry, I didn't notice that this change requires PIP before, so the related PRs have been merged. +- https://github.com/apache/pulsar/pull/20791 + +If this PIP vote does not pass, I revert this PR after that. + + +### Compatibility + +This PIP is to provide a method for users to use, not an new interface. Review Comment: > I think the new interface here is referred to the Java interface? Yes, I mean, not a new interface method, so don't have to force the user to implement it. So it is forward compatible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
