shibd commented on code in PR #20807: URL: https://github.com/apache/pulsar/pull/20807#discussion_r1284936688
########## pip/pip-281.md: ########## @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted, +you can use the [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class and extend it to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector extends from the class, +it cannot notify the function framework if it encounters an exception while consuming data internally, +in other words, the function call `source.read()` never triggers an exception and never exits the process. + + +## Goals + +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws exception. +```java + + public Record<T> read() throws Exception { + Record<T> record = queue.take(); + if (record instanceof ErrorNotifierRecord) { + throw ((ErrorNotifierRecord) record).getException(); + } + return record; + } + + + /** + * Allows the source to notify errors asynchronously. + * @param ex + */ + public void notifyError(Exception ex) { + consume(new ErrorNotifierRecord(ex)); + } +} +``` + +Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) + + +### Compatibility + +This PIP is to provide a method for users rather than introducing a new interface. + +- So it is forward compatible +- However, connector using this method are not backward compatible. +For example, the currently Kafka source connector compiles with version 3.1(include this pip) Pulsar dependencies and uses the `notifyError` method, +if it switches back to version 3.0(exclude this pip) Pulsar to compile, it will encounter compile errors. Review Comment: > What's the meaning of "(include this pip)"? That means the 3.1 version has this pip-related feature. > Does the Kafka source connector "use" a method? This is an example, in fact it is not currently used, and then this PIP will be PR to fix it > "Who" switches back to 3.0? Do you mean the Kafka source connector complies with Pulsar 3.0? Kafka source connector. No, I'm giving examples of incompatible scenarios. hi @Anonymitaet Please help take another look at the latest sentence, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
