[ https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823583#comment-15823583 ]
Jiangjie Qin commented on KAFKA-3190: ------------------------------------- [~ewencp] This remains an issue and confuses some of the applications from time to time. We had some discussion on the PR. The main reason we want to fire callback in the producer.send() is because this function can throw API exceptions, and generally speaking, all the ApiExceptions are supposed to be handled by the callback. Here the two possible ApiExceptions are RecordTooLargeException and TimeoutException There are actually two issues. 1. We are throwing {{RecordTooLargeException}} in the producer directly once the ProdcuerRecord serialized size is greater than RequestSize or it is greater than totalMemorySize. This is a little weird because {{RecordTooLargeException}} is an API exception, and ideally should only be returned by the broker. Technically speaking it is possible that the serialized bytes can be compressed extremely well and end up less than the max message size limit. So we are essentially throwing an API exception based on our estimation. Maybe it is better to throw IllegalArgumentException instead. 2. We are actually misusing the o.a.k.common.errors.TimeoutException in many places including in the producer, consumer and the broker. This Exception is originally mapped to error code 7 in the ProdcueResponse, indicating that the replication did not finish within the timeout specified in the ProducerRequest when acks=-1. Now it is used everywhere in the producer and consumer indicating any kind of timeout, regardless of whether it is returned from broker or not. Similarly, on the broker side, it is used by DelayedCreateTopic and DelayedDeleteTopic. I think this is a serious problem and have to be fixed. Unfortunately it will require API change. I will probably have another ticket and a KIP. The simplest solution I can think of now is to create a new {{RequestTimeoutException}} extending from ApiException and map that to error code 7 (We can discuss whether we want to split that into two separate exceptions: ReplicationTimeoutException and RequestTimeoutException, but that can be done later). And change the current TimeoutException to extend directly from KafkaException. This way we do not need to change the massive usage of TimeoutException. So to summary, the solution I am thinking is the following: 1. Throw IllegalArgumentException instead of RecordTooLargeException when the serialized record size is greater than the maxRequestSize or totalMemorySize. 2. Add a new RequestTimeoutException extending from ApiExceptions and map that new exception to error code 7. 3. Change the current o.a.k.common.errors.TimeoutException to extend from KafkaException. 4. Remove the ApiException handling logic in producer.send(). Given that 0.10.2.0 release has already cut off on the KIPs, I am not sure if you want to get it into 0.10.2.0. We can probably get the fix in 0.10.2.0 if people generally agree on the way to fix, otherwise it we may have to delay it to the next release. [~guozhang] [~ijuma] [~junrao] Since you were reviewing the original PR, what do you think about the above solution? [~hachikuji] It would also be good if you can comment and see if you have any concerns. Thanks. > KafkaProducer should not invoke callback in send() > -------------------------------------------------- > > Key: KAFKA-3190 > URL: https://issues.apache.org/jira/browse/KAFKA-3190 > Project: Kafka > Issue Type: Bug > Components: clients, producer > Affects Versions: 0.9.0.0 > Reporter: Jiangjie Qin > Assignee: Jiangjie Qin > Priority: Critical > Fix For: 0.10.2.0 > > > Currently KafkaProducer will invoke callback.onComplete() if it receives an > ApiException during send(). This breaks the guarantee that callback will be > invoked in order. It seems ApiException in send() only comes from metadata > refresh. If so, we can probably simply throw it instead of invoking > callback(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)