Anonymitaet commented on code in PR #20807: URL: https://github.com/apache/pulsar/pull/20807#discussion_r1281216968
########## pip/pip-281.md: ########## @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted Review Comment: ```suggestion In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throws an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted ``` ########## pip/pip-281.md: ########## @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted + +On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector that extends from the class, Review Comment: ```suggestion However, if the source connector extends from the class, ``` ########## pip/pip-281.md: ########## @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted + +On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector that extends from the class, +it cannot notify the function framework if it encounters an exception while consuming data internally, +in other words, the function call `source.read()` never triggers an exception and never exits the process. + + +## Goals + +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws it. +```java + + public Record<T> read() throws Exception { + Record<T> record = queue.take(); + if (record instanceof ErrorNotifierRecord) { + throw ((ErrorNotifierRecord) record).getException(); + } + return record; + } + + + /** + * Allows the source to notify errors asynchronously. + * @param ex + */ + public void notifyError(Exception ex) { + consume(new ErrorNotifierRecord(ex)); + } +} +``` + +Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) + + +### Compatibility + +This PIP is to provide a method for users to use, not an new interface. Review Comment: ```suggestion This PIP is to provide a method for users rather than introducing a new interface. ``` ########## pip/pip-281.md: ########## @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted + +On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector that extends from the class, +it cannot notify the function framework if it encounters an exception while consuming data internally, +in other words, the function call `source.read()` never triggers an exception and never exits the process. + + +## Goals + +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws it. Review Comment: ```suggestion Add the `notifyError` method on PushSource. This method can receive an exception and put the exception in the queue. The next time an exception is `read`, it will throw it. ``` throw "it", throw what? ########## pip/pip-281.md: ########## @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted + +On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector that extends from the class, +it cannot notify the function framework if it encounters an exception while consuming data internally, +in other words, the function call `source.read()` never triggers an exception and never exits the process. + + +## Goals + +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws it. +```java + + public Record<T> read() throws Exception { + Record<T> record = queue.take(); + if (record instanceof ErrorNotifierRecord) { + throw ((ErrorNotifierRecord) record).getException(); + } + return record; + } + + + /** + * Allows the source to notify errors asynchronously. + * @param ex + */ + public void notifyError(Exception ex) { + consume(new ErrorNotifierRecord(ex)); + } +} +``` + +Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) + + +### Compatibility + +This PIP is to provide a method for users to use, not an new interface. + +- So it is forward compatible +- However, connector using this method are not backward compatible. +For example, the Kafka source connector compiles with version 3.1(include this pip) Pulsar dependencies and uses the `notifyError` method, and if it switches back to version 3.0(exclude this pip) Pulsar compilation, it will encounter compilation errors. + +### In Scope + +Use this method, Like all current source connectors that extends the PushSource, process exit can be implemented. Such as: Review Comment: This sentence is unclear. What do you intend to mean? ########## pip/pip-281.md: ########## @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted + +On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector that extends from the class, +it cannot notify the function framework if it encounters an exception while consuming data internally, +in other words, the function call `source.read()` never triggers an exception and never exits the process. + + +## Goals + +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws it. +```java + + public Record<T> read() throws Exception { + Record<T> record = queue.take(); + if (record instanceof ErrorNotifierRecord) { + throw ((ErrorNotifierRecord) record).getException(); + } + return record; + } + + + /** + * Allows the source to notify errors asynchronously. + * @param ex + */ + public void notifyError(Exception ex) { + consume(new ErrorNotifierRecord(ex)); + } +} +``` + +Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) + + +### Compatibility + +This PIP is to provide a method for users to use, not an new interface. + +- So it is forward compatible +- However, connector using this method are not backward compatible. +For example, the Kafka source connector compiles with version 3.1(include this pip) Pulsar dependencies and uses the `notifyError` method, and if it switches back to version 3.0(exclude this pip) Pulsar compilation, it will encounter compilation errors. Review Comment: This is a long sentence with unclear clauses. What do you intend to mean? ########## pip/pip-281.md: ########## @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted + +On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. Review Comment: ```suggestion on IO source connector, you can use the [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class and extend it to quickly implement the push message model. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
