[ https://issues.apache.org/jira/browse/KAFKA-4515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735355#comment-15735355 ]
Rajini Sivaram commented on KAFKA-4515: --------------------------------------- This is being addressed in KIP-91: https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer > Async producer send not retrying on TimeoutException: Batch Expired > ------------------------------------------------------------------- > > Key: KAFKA-4515 > URL: https://issues.apache.org/jira/browse/KAFKA-4515 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.9.0.1 > Reporter: Di Shang > > We are testing out broker failure resiliency, we have a cluster of 3 brokers, > a topic with 5 partitions and 2 replicas. The replicas are evenly distributed > and there is at least a partition leader in every broker. We use this code to > continuously send msg and then kill one of the brokers to see if we lost any > msg. > {code:title=MyTest.java|borderStyle=solid} > static volatile KafkaProducer<Void, String> producer; > public static void send(ProducerRecord<Void, String> record) { > producer.send(record, (metadata, exception) -> { > if (exception != null) { > // handle exception with manual retry > System.out.println("Error, resending..."); > exception.printStackTrace(); > try { > Thread.sleep(100); > } catch (InterruptedException e) { > e.printStackTrace(); > } > //send(record); // without this retry, msg would be lost > } else if (metadata != null) { > System.out.println("Sent " + record); > } else { > System.out.println("No exception and no metadata"); > } > }); > } > public static void main(String[] args) throws Exception { > Properties props = new Properties(); > props.put("bootstrap.servers", "..."); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("retries", "100000"); > props.put("acks", "1"); > props.put("request.timeout.ms", "1000"); > producer = new KafkaProducer<>(props); > Long i = 1L; > while (true) { > ProducerRecord<Void, String> record = > new ProducerRecord<>("my-topic", i.toString()); > send(record); > Thread.sleep(100); > i++; > } > } > {code} > What we found is that when we set *request.timeout.ms* to a small value like > 1000, then when we kill a broker we would get a few TimeoutException: Batch > Expired errors in the send() callback. And if we don't handle this by > explicit retry like in the above code, then we would lose those msg. > The documentation for *request.timeout.ms* says: > bq. The configuration controls the maximum amount of time the client will > wait for the response of a request. If the response is not received before > the timeout elapses the client will resend the request if necessary or fail > the request if retries are exhausted. > This makes me think that a TimeoutException should be implicitly retried > using the *retries* options, which doesn't seem to work. > Strangely we also noticed that if *request.timeout.ms* is set long enough > like the default 30000, then we don't lose any msg when killing a broker even > if we set *retries* to 0. > So it seems to me that the *retries* option is not working regarding to > broker down scenario. There seems to be some other internal mechanism for > handling broker failure and msg retry, and this mechanism won't work if there > is TimeoutException. -- This message was sent by Atlassian JIRA (v6.3.4#6332)