[ https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328781#comment-15328781 ]
radha commented on KAFKA-3686: ------------------------------ We have a similar issue and even with a very very large retry, the producer decides to skip messages when there is a network issue. In addition to what the OP asked, at a minimum, there should be a way to get a handle of the message after the last retry or after everything fails? The only option in an Async send is a callback on completion where even the recordmetadata is empty as expected because there was no server communication but what about the record itself and number of retries that happened? Error producing to topic org.apache.kafka.common.errors.TimeoutException: Batch Expired reconnect.backoff.ms = 100 retry.backoff.ms = 100 buffer.memory = 33554432 timeout.ms = 30000 connections.max.idle.ms = 540000 max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 request.timeout.ms = 5000 acks = 1 batch.size = 16384 receive.buffer.bytes = 32768 retries = 10000000 max.request.size = 1048576 metrics.sample.window.ms = 30000 send.buffer.bytes = 131072 linger.ms = 10 > Kafka producer is not fault tolerant > ------------------------------------ > > Key: KAFKA-3686 > URL: https://issues.apache.org/jira/browse/KAFKA-3686 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.1 > Reporter: Luca Bruno > > *Setup* > I have a cluster of 3 kafka server, a topic with 12 partitions with replica > 2, and a zookeeper cluster of 3 nodes. > Producer config: > {code} > props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092"); > props.put("acks", "1"); > props.put("batch.size", 16384); > props.put("retries", 3); > props.put("buffer.memory", 33554432); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > {code} > Producer code: > {code} > Producer<String, String> producer = new KafkaProducer<>(props); > for(int i = 0; i < 10; i++) { > Future<RecordMetadata> f = producer.send(new ProducerRecord<String, > String>("topic", null, Integer.toString(i))); > f.get(); > } > {code} > *Problem* > Cut the network between the producer (p1) and one of the kafka servers (say > k1). > The cluster is healthy, hence the kafka bootstrap tells the producer that > there are 3 kafka servers (as I understood it), and the leaders of the > partitions of the topic. > So the producer will send messages to all of the 3 leaders for each > partition. If the leader happens to be k1 for a message, the producer raises > the following exception after request.timeout.ms: > {code} > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Batch Expired > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > at Test.main(Test.java:25) > Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired > {code} > In theory, the application should handle the failure. In practice, messages > are getting lost, even though there are other 2 leaders available for writing. > I tried with values of acks both 1 and -1. > *What I expected* > Given the client library is automatically deciding the hashing / round robin > schema for the partition, I would say it's not very important which partition > the message is being sent to. > I expect the client library to handle the failure by sending the message to a > partition of a different leader. > Neither kafka-clients nor rdkafka handle this failure. Given those are the > main client libraries being used for kafka as far as I know, I find it a > serious problem in terms of fault tolerance. > EDIT: I cannot add comments to this issue, don't understand why. To answer > [~fpj] yes, I want the first. In the case of network partitions I want to > ensure my messages are stored. If the libraries don't do that, it means I > have to reimplement them. Or otherwise, postpone sending such messages until > the network partition resolves (which means implementing some kind of backlog > on disk of the producer, which should instead be the kafka purpose after > all). In both cases, it's something that is not documented and it's very > inconvenient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)