Guozhang Wang created KAFKA-10829:
-------------------------------------
Summary: Kafka Streams handle produce exception improvement
Key: KAFKA-10829
URL: https://issues.apache.org/jira/browse/KAFKA-10829
Project: Kafka
Issue Type: Improvement
Components: producer , streams
Reporter: Guozhang Wang
A summary of some recent discussions on how we should improve on embedded
producer exception handling.
Note that below the basline logic would guarantee that our correctness
semantics is not violated; and optimization are on top of the baseline to
reduce the user's burden by letting the library auto-handle certain types of
exception.
1) ``Producer.send()`` throw exception directly:
1.a) baseline (to make sure correctness) logic is to always wrap them as
StreamsException, it would cause the thread to shutdown and exception handler
triggered. The handler could look into the wrapped exception and decide whether
the shutdown thread can be restarted.
1.b) optimization is to look at the exception, and decide if they can be
wrapped as TaskMigratedException instead (e.g. ProducerFenced). This would then
be auto-handled by lost-all-tasks and re-join.
2) ``Producer.send()`` Callback has an exception:
2.a) baseline is first to check if the exception is instanceof
RetriableException.
If not retriable, pass it to the producer exception handler to decide whether
to throw or to continue with record dropped. If decide to throw, always warp it
as StreamsException and keep it locally; at the same time do not send more
records from the caller. In the next send call, check the remembered exception
and throw. It would cause the thread to shutdown and exception handler
triggered.
If the exception is not Retriable, always throw it as a fatal StreamsException.
2.b) optimization one: if the non-retriable exception can be translated as a
TaskMigratedException, then do not wrap it as StreamsException to let the
library handle internally.
2.c) optimization two: if the retriable exception is a timeout exception, then
do not pass to the produce exception handler and treat it as TaskMigrated.
3) ``Producer.XXXTxn`` APIs except ``AbortTxn`` throw exception directly:
3.a) baseline logic is to capture all KafkaException except TimeoutException,
and handle them as *TaskCorrupted* (which include abort the transaction, reset
the state, and re-join the group). TimeoutException would be rethrown.
3.b) optimization: some exceptions can be handled as TaskMigrated, which would
be handled in a lighter way.
4) ``Producer.abortTxn`` throw exception:
3.a) baseline logic is to capture all KafkaException except TimeoutException
as fatal StreamsException. TimeoutException would be rethrown.
3.b) optimization: some exceptions can be ignored (e.g. invalidTxnTransition
means the abort did not succeeded).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)