[ 
https://issues.apache.org/jira/browse/KAFKA-4515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735355#comment-15735355
 ] 

Rajini Sivaram commented on KAFKA-4515:
---------------------------------------

This is being addressed in KIP-91: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer


> Async producer send not retrying on TimeoutException: Batch Expired
> -------------------------------------------------------------------
>
>                 Key: KAFKA-4515
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4515
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.9.0.1
>            Reporter: Di Shang
>
> We are testing out broker failure resiliency, we have a cluster of 3 brokers, 
> a topic with 5 partitions and 2 replicas. The replicas are evenly distributed 
> and there is at least a partition leader in every broker. We use this code to 
> continuously send msg and then kill one of the brokers to see if we lost any 
> msg. 
> {code:title=MyTest.java|borderStyle=solid}
>     static volatile KafkaProducer<Void, String> producer;
>     public static void send(ProducerRecord<Void, String> record) {
>         producer.send(record, (metadata, exception) -> {
>             if (exception != null) {
>                 // handle exception with manual retry
>                 System.out.println("Error, resending...");
>                 exception.printStackTrace();
>                 try {
>                     Thread.sleep(100);
>                 } catch (InterruptedException e) {
>                     e.printStackTrace();
>                 }
>                 //send(record); // without this retry, msg would be lost
>             } else if (metadata != null) {
>                 System.out.println("Sent " + record);
>             } else {
>                 System.out.println("No exception and no metadata");
>             }
>         });
>     }
>     public static void main(String[] args) throws Exception {
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "...");
>         props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>         props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>         props.put("retries", "100000");
>         props.put("acks", "1");
>         props.put("request.timeout.ms", "1000");
>         producer = new KafkaProducer<>(props);
>         Long i = 1L;
>         while (true) {
>             ProducerRecord<Void, String> record =
>                 new ProducerRecord<>("my-topic", i.toString());
>             send(record);
>             Thread.sleep(100);
>             i++;
>         }
>     }
> {code}
> What we found is that when we set *request.timeout.ms* to a small value like 
> 1000, then when we kill a broker we would get a few TimeoutException: Batch 
> Expired errors in the send() callback. And if we don't handle this by 
> explicit retry like in the above code, then we would lose those msg. 
> The documentation for *request.timeout.ms* says:
> bq. The configuration controls the maximum amount of time the client will 
> wait for the response of a request. If the response is not received before 
> the timeout elapses the client will resend the request if necessary or fail 
> the request if retries are exhausted.
> This makes me think that a TimeoutException should be implicitly retried 
> using the *retries* options, which doesn't seem to work. 
> Strangely we also noticed that if *request.timeout.ms* is set long enough 
> like the default 30000, then we don't lose any msg when killing a broker even 
> if we set *retries* to 0. 
> So it seems to me that the *retries* option is not working regarding to 
> broker down scenario. There seems to be some other internal mechanism for 
> handling broker failure and msg retry, and this mechanism won't work if there 
> is TimeoutException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to