This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 88231f732a8 [improve][pip] PIP-297: Support terminating Function &
Connector with the fatal exception (#21079)
88231f732a8 is described below
commit 88231f732a804fe1c75c9f082500762b3d1faa6c
Author: Zike Yang <[email protected]>
AuthorDate: Wed Sep 6 19:18:18 2023 +0800
[improve][pip] PIP-297: Support terminating Function & Connector with the
fatal exception (#21079)
### Motivation
This PIP is to improve the current function exception handler. It will be
applied to both Pulsar Function and Pulsar IO Connector.
For more details, please refer to `pip-297.md`
---
pip/pip-297.md | 213 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 213 insertions(+)
diff --git a/pip/pip-297.md b/pip/pip-297.md
new file mode 100644
index 00000000000..2985864beed
--- /dev/null
+++ b/pip/pip-297.md
@@ -0,0 +1,213 @@
+# Title: Support terminating Function & Connector with the fatal exception
+
+# Background knowledge
+
+The **Pulsar Function** is a serverless computing framework that runs on top
of Pulsar and processes messages.
+
+The **Pulsar IO Connector** is a framework that allows users to easily
integrate Pulsar with external systems, such as
+databases, messaging systems, and data pipelines. With Pulsar IO Connector,
you can create, deploy, and manage
+connectors that read data from or write data to Pulsar topics. There are two
types of Pulsar IO Connectors: source and
+sink. A **source connector** imports data from another system to Pulsar, while
a **sink connector** exports data from
+Pulsar to another system. The Pulsar IO Connector is implemented based on the
Pulsar Function framework. So in
+the following, we treat the connector as a special kind of function. The
`function` refers to both function and
+connector.
+
+**Function Instance** is a running instance of a Pulsar IO Connector that
interacts with a specific external system or a
+Pulsar Function that processes messages from the topic.
+
+**Function Framework** is a framework for running the Function instance.
+
+**Function Context** is an interface that provides access to various
information and resources for the connector or the
+function. The function context is passed to the connector or the function when
it is initialized, and then can be used
+to interact with the Pulsar system.
+
+## The current implementation of the exception handler
+
+**Function instance thread**: The function framework initializes a thread for
each function instance to handle the
+core logic of the function/connector, including consuming messages from the
Pulsar topic for the sink connector,
+executing the logic of the function, producing messages to the Pulsar topic
for the source connector, handling the
+exception, etc. And let's define the **Connector thread/Function thread** as a
thread that is created by the connector
+or function itself.
+
+**Exception handling logic**: The function itself can throw exceptions, and
this thread will catch the exception and
+then close the function. This means that the function will stop working until
it is restarted manually or
+automatically by the function framework.
+
+Even though it is not explicitly defined, there are two types of exceptions
that should be handled by the function or
+the framework:
+
+- **Fatal exception**: This is an exception that the function cannot recover
from by itself and needs to notify the
+ framework to terminate it. These are fatal exceptions that indicate a
configuration issue, a logic error, or an
+ incompatible system. The function framework will catch these exceptions,
report them to users, and terminate the
+ function.
+- **Non-fatal exception** is an exception that the function instance don't
need to be terminated for. It could be
+ handled by the connector or function itself. Or be thrown by the function.
This exception won't cause the function
+ instance to be terminated.
+
+### How to handle exceptions thrown from connectors
+
+All the exceptions thrown form the connector are treated as fatal exceptions.
+
+If the exception is thrown from the function instance thread, the function
framework will catch the exception and
+terminate the function instance.
+
+If the exception is thrown from the connector thread that is created by the
connector itself, the function framework
+will not be able to catch the exception and terminate the function instance.
The connector will hang forever.
+The `Motivation` part will talk more about this case.
+
+If the exception is thrown from the external system, the connector
implementation could treat it as a retryable
+exception and retry to process the message later, or throw it to indicate it
as a fatal exception.
+
+### How to handle exceptions thrown from functions
+
+All the exceptions thrown from the pulsar function are treated as non-fatal
exceptions. The function framework will
+catch the exception and log it. But it will not terminate the function
instance.
+
+There is no way for the function developer to throw a fatal exception to the
function framework to terminate the
+function instance.
+
+# Motivation
+
+Currently, the connector and function cannot terminate the function instance
if there are fatal exceptions thrown
+outside the function instance thread. The current implementation of the
connector and Pulsar Function exception handler
+cannot handle the fatal exceptions that are thrown outside the function
instance thread.
+
+For example, suppose we have a sink connector that uses its own threads to
batch-sink the data to an external system. If
+any fatal exceptions occur in those threads, the function instance thread will
not be aware of them and will
+not be able to terminate the connector. This will cause the connector to hang
indefinitely. There is a related issue
+here: https://github.com/apache/pulsar/issues/9464
+
+The same problem exists for the source connector. The source connector may
also use a separate thread to fetch data from
+an external system. If any fatal exceptions happen in that thread, the
connector will also hang forever. This issue has
+been observed for the Kafka source connector:
https://github.com/apache/pulsar/issues/9464. We have fixed it by adding
+the notifyError method to the `PushSource` class in PIP-281:
https://github.com/apache/pulsar/pull/20807. However, this
+does not solve the same problem that all source connectors face because not
all connectors are implemented based on
+the `PushSource` class.
+
+The problem is same for the Pulsar Function. Currently, the function can't
throw fatal exceptions to the function
+framework. We need to provide a way for the function developer to implement it.
+
+We need a way for the connector and function developers to throw fatal
exceptions outside the function instance
+thread. The function framework should catch these exceptions and terminate the
function accordingly.
+
+# Goals
+
+## In Scope
+
+- Support terminating the function instance with fatal exceptions
+- This proposal will apply both to the Pulsar Function and the Pulsar
Connector.
+
+## Out of Scope
+
+- The fixes of the exception-raising issue mentioned in the Motivation part
for all the connectors are not included in
+ this PIP. This PIP only provides the feature for the connector developer to
terminate the function instance. The fixes
+ should be in several different PRs.
+
+# High Level Design
+
+Introduce a new method `fatal` to the context. All the connector
implementation code and the function code
+can use this context and call the `fatal` method to terminate the instance
while raising a fatal exception.
+
+After the connector or function raises the fatal exception, the function
instance thread will be interrupted.
+The function framework then could catch the exception, log it, and then
terminate the function instance.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP proposes to add a new method`fatal`to the context `BaseContext`. This
method allows the connector or the
+function code to report a fatal exception to the function framework and
terminate the instance. The `SinkContext`
+and `SourceContext` are all inherited from `BaseContext`. Therefore, all the
sink connectors and source connectors can
+invoke this new method. The pulsar function context class `Context` is also
inherited from `BaseContext`. Therefore, the
+function code can also invoke this new method.
+
+In the `fatal` method, the function instance thread will be interrupted. The
function instance thread can then
+catch the interrupt exception and get the fatal exception. The function
framework then logs this exception,
+reports to the metrics, and finally terminates the function instance.
+
+Tbe behavior when invoking the `fatal` method:
+
+- For the connector thread or function thread:
+ - Invoke the `fatal` method
+ - Send the exception to the function framework. There is a field
`deathException` in the
+ class `JavaInstanceRunnable` that is used to store the fatal exception.
+ - Interrupt the function instance thread
+- For the function instance thread:
+ - Catch the interrupt exception
+ - Get the exception from the function framework
+ - Report the log and metrics
+ - Close the function instance
+
+## Public-facing Changes
+
+### Public API
+
+Introduce `fatal` method to the `BaseContext`:
+
+```java
+public interface BaseContext {
+ /**
+ * Terminate the function instance with a fatal exception.
+ *
+ * @param t the fatal exception to be raised
+ */
+ void fatal(Throwable t);
+}
+```
+
+### Binary protocol
+
+No changes for this part.
+
+### Configuration
+
+No changes for this part.
+
+### CLI
+
+No changes for this part.
+
+### Metrics
+
+No changes for this part.
+
+# Monitoring
+
+No changes for this part.
+
+# Security Considerations
+
+No security-related changes.
+The new method `fatal` will only take effect on the current function instance.
It won't affect other function instances
+even they are in the same function worker.
+
+# Backward & Forward Compatibility
+
+## Revert
+
+No operation required.
+
+## Upgrade
+
+No operation required.
+
+# Alternatives
+
+## Using futures to handle results or exceptions returned the connector
+
+The benefit of this solution is that it makes the use of exception throwing
more intuitive to the connector developer.
+
+But it requires changes to existing interfaces, including `Source` and `Sink`,
which can complicate connector
+development. And we still need the `fatal` method to handle some cases such as
terminating the instance in code outside
+of the message processing logic. This alternative solution can't handle this
case.
+
+Meanwhile, the implementation of this solution will also be more complex,
involving changes to the core message
+processing logic of the function framework. We need to turn the entire message
processing logic into an asynchronous
+pattern.
+
+# General Notes
+
+# Links
+
+* Mailing List discussion thread:
https://lists.apache.org/thread/j59gzzwjp8c48lwv5poddm9qzlp2hol0
+* Mailing List voting thread:
https://lists.apache.org/thread/ggok3c2601mnbdomr65v3pjth3lk6fr8