[ 
https://issues.apache.org/jira/browse/KAFKA-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576529#comment-17576529
 ] 

Guozhang Wang commented on KAFKA-14138:
---------------------------------------

[~sagarrao] Yes, I think this is highly related to KIP-691 (see "Callback 
Exception Improvement" and "Unify Wrapped KafkaException").

My intention is that, if we wrap all directly thrown exceptions with a 
`KafkaException`, then we should consider doing the same for exceptions passed 
in the future/callback as well. Of course, maybe we could use a different 
wrapping exception class to be aligned with KIP-691. And again, we also need to 
distinguish which exceptions should be thrown directly. And the principles are:

1. If EOS is not enabled, then produce delivery failures would be only set in 
future/callback, since they are not fatal; fatal exceptions should be thrown 
directly and the user thread should handle them by closing the producer.
2. If EOS is enabled, even non-fatal produce delivery failures should be thrown 
directly and be handled immediately, since otherwise we'd be violating the 
semantics. However, depending on the exception the caller thread could decide 
whether the producer should be closed or we could still abort the transaction 
and then move on.

> The Exception Throwing Behavior of Transactional Producer is Inconsistent
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-14138
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14138
>             Project: Kafka
>          Issue Type: Improvement
>          Components: producer 
>            Reporter: Guozhang Wang
>            Assignee: Sagar Rao
>            Priority: Critical
>
> There's an issue for inconsistent error throwing inside Kafka Producer when 
> transactions are enabled. In short, there are two places where the received 
> error code from the brokers would be eventually thrown to the caller:
> * Recorded on the batch's metadata, via "Sender#failBatch"
> * Recorded on the txn manager, via "txnManager#handleFailedBatch".
> The former would be thrown from 1) the `Future<RecordMetadata>` returned from 
> the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, 
> the latter would be thrown from `producer.send()` directly in which we call 
> `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown 
> from the former, it's not wrapped hence the direct exception (e.g. 
> ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. 
> KafkaException(ClusterAuthorizationException). And which one would be thrown 
> depend on a race condition since we cannot control by the time the caller 
> thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's 
> error has been sent back or not.
> For example consider the following sequence:
> 1. caller thread: within future = producer.send(), call 
> recordAccumulator.append
> 2. sender thread: drain the accumulator, send the produceRequest and get the 
> error back.
> 3. caller thread: within future = producer.send(), call 
> txnManager.maybeAddPartition
> 4. sender thread: get the addPartition token, send the txnRequest and get the 
> error back. NOTE the sender thread could send these two requests in any order.
> 5. caller thread: future.get()
> In a sequence where then 3) happened before 2), we would only get the raw 
> exception at step 5; in a sequence where 2) happened before 3), then we would 
> throw the exception immediately at 3).
> This inconsistent error throwing is pretty annoying for users since they'd 
> need to handle both cases, but many of them actually do not know this 
> trickiness. We should make the error throwing consistent, e.g. we should 
> consider: 1) which errors would be thrown from callback / future.get, and 
> which would be thrown from the `send` call directly, and these errors should 
> better be non-overlapping, 2) whether we should wrap the raw error or not, we 
> should do so consistently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to