[ 
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823583#comment-15823583
 ] 

Jiangjie Qin commented on KAFKA-3190:
-------------------------------------

[~ewencp] This remains an issue and confuses some of the applications from time 
to time. 

We had some discussion on the PR. The main reason we want to fire callback in 
the producer.send() is because this function can throw API exceptions, and 
generally speaking, all the ApiExceptions are supposed to be handled by the 
callback. Here the two possible ApiExceptions are RecordTooLargeException and 
TimeoutException

There are actually two issues.
1. We are throwing {{RecordTooLargeException}} in the producer directly once 
the ProdcuerRecord serialized size is greater than RequestSize or it is greater 
than totalMemorySize. This is a little weird because 
{{RecordTooLargeException}} is an API exception, and ideally should only be 
returned by the broker. Technically speaking it is possible that the serialized 
bytes can be compressed extremely well and end up less than the max message 
size limit. So we are essentially throwing an API exception based on our 
estimation. Maybe it is better to throw IllegalArgumentException instead.

2. We are actually misusing the o.a.k.common.errors.TimeoutException in many 
places including in the producer, consumer and the broker. This Exception is 
originally mapped to error code 7 in the ProdcueResponse, indicating that the 
replication did not finish within the timeout specified in the ProducerRequest 
when acks=-1. Now it is used everywhere in the producer and consumer indicating 
any kind of timeout, regardless of whether it is returned from broker or not. 
Similarly, on the broker side, it is used by DelayedCreateTopic and 
DelayedDeleteTopic. I think this is a serious problem and have to be fixed. 
Unfortunately it will require API change. I will probably have another ticket 
and a KIP. The simplest solution I can think of now is to create a new 
{{RequestTimeoutException}} extending from ApiException and map that to error 
code 7 (We can discuss whether we want to split that into two separate 
exceptions: ReplicationTimeoutException and RequestTimeoutException, but that 
can be done later). And change the current TimeoutException to extend directly 
from KafkaException. This way we do not need to change the massive usage of 
TimeoutException.

So to summary, the solution I am thinking is the following:
1. Throw IllegalArgumentException instead of RecordTooLargeException when the 
serialized record size is greater than the maxRequestSize or totalMemorySize.
2. Add a new RequestTimeoutException extending from ApiExceptions and map that 
new exception to error code 7.
3. Change the current o.a.k.common.errors.TimeoutException to extend from 
KafkaException.
4. Remove the ApiException handling logic in producer.send().

Given that 0.10.2.0 release has already cut off on the KIPs, I am not sure if 
you want to get it into 0.10.2.0. We can probably get the fix in 0.10.2.0 if 
people generally agree on the way to fix, otherwise it we may have to delay it 
to the next release.

[~guozhang] [~ijuma] [~junrao] Since you were reviewing the original PR, what 
do you think about the above solution? [~hachikuji] It would also be good if 
you can comment and see if you have any concerns.

Thanks.

> KafkaProducer should not invoke callback in send()
> --------------------------------------------------
>
>                 Key: KAFKA-3190
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3190
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 0.9.0.0
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>            Priority: Critical
>             Fix For: 0.10.2.0
>
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an 
> ApiException during send(). This breaks the guarantee that callback will be 
> invoked in order. It seems ApiException in send() only comes from metadata 
> refresh. If so, we can probably simply throw it instead of invoking 
> callback().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to