I reconsidered this issue and think I found that this is much worse than I 
explained before.

IIUC, in short, setting {{retry.backoff.ms}} to lager value can delays 
KafkaProducer to update outdated metadata.
That is, when we set {{retry.backoff.ms}} to 1 second for example, and a 
partition leadership failover happens, the producer will take 1 seconds to fire 
metadata request in the worst case, even though it could detect broker 
disconnection or outdated partition leadership information.

Here's the result of my experiment. I modified 
{{KafkaProducerMetadataUpdateDurationTest}} and observed DEBUG logs of 
NetworkClient and Metadata.

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public final class KafkaProducerMetadataUpdateDurationTest {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
        String retryBackoffMs = System.getProperty("retry.backoff.ms");
        System.err.println("Experimenting with retry.backoff.ms = " + 

        Producer<String, String> producer =
                new KafkaProducer<>(props, new StringSerializer(), new 

        try {
            int i = 0;
            while (true) {
                final int produceSeq = i++;
                final long t0 = System.nanoTime();
                producer.send(new ProducerRecord<>("test", produceSeq % 3, 
"key", "value"),
                              new Callback() {
                                  public void onCompletion(RecordMetadata 
metadata, Exception exception) {
                                      long produceDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
duration=%d, exception=%s\n", produceSeq, produceDuration, exception);
                long sendDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
                System.err.printf("Send[%d]: duration=%d\n", produceSeq, 
        } finally {

log4j.rootLogger=INFO, stdout

log4j.logger.org.apache.kafka.clients.Metadata=DEBUG, stdout
log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG, stdout
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=DEBUG, stdout

log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

Topic "test" has 3 replicas and 3 partitions.
Then I started KafkaProducerMetadataUpdateDurationTest, and stopped broker 1 
manually at (*2). Here's the log:

./bin/kafka-run-class.sh -Dlog4j.configuration=file:./log4j.properties 
-Dretry.backoff.ms=10000 KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 10000
[2016-09-02 22:36:29,839] INFO Kafka version : 
[2016-09-02 22:36:29,839] INFO Kafka commitId : 8f3462552fa4d6a6 
[2016-09-02 22:36:39,826] DEBUG Initialize connection to node -2 for sending 
metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,826] DEBUG Initiating connection to node -2 at 
HOST-2:9092. (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,883] DEBUG Completed connection to node -2 

# *1 The first metadata request
[2016-09-02 22:36:39,902] DEBUG Sending metadata request {topics=[test]} to 
node -2 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,929] DEBUG Updated cluster metadata version 2 to 
Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-1:9092 (id: 1 rack: 
null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test, 
partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), 
Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = 
[3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = 
[1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
Send[0]: duration=10104
[2016-09-02 22:36:39,944] DEBUG Initiating connection to node 3 at HOST-3:9092. 
[2016-09-02 22:36:39,947] DEBUG Completed connection to node 3 
Produce[0]: duration=10117, exception=null
Send[1]: duration=0
[2016-09-02 22:36:40,950] DEBUG Initiating connection to node 1 at HOST-1:9092. 
[2016-09-02 22:36:40,952] DEBUG Completed connection to node 1 
Produce[1]: duration=12, exception=null
Send[2]: duration=0
[2016-09-02 22:36:41,955] DEBUG Initiating connection to node 2 at HOST-2:9092. 
[2016-09-02 22:36:41,958] DEBUG Completed connection to node 2 
Produce[2]: duration=5, exception=null
Send[3]: duration=0
Produce[3]: duration=4, exception=null

# *2 I stopped broker 1 at this moment

[2016-09-02 22:36:43,134] DEBUG Node 1 disconnected. 
Send[4]: duration=0
[2016-09-02 22:36:44,137] DEBUG Initiating connection to node 1 at HOST-1:9092. 
[2016-09-02 22:36:44,139] DEBUG Node 1 disconnected. 
Send[5]: duration=0
Produce[5]: duration=4, exception=null
[2016-09-02 22:36:45,141] DEBUG Initiating connection to node 1 at HOST-1:9092. 
[2016-09-02 22:36:45,143] DEBUG Node 1 disconnected. 
Send[6]: duration=0
Produce[6]: duration=3, exception=null
[2016-09-02 22:36:46,148] DEBUG Initiating connection to node 1 at HOST-1:9092. 
[2016-09-02 22:36:46,150] DEBUG Node 1 disconnected. 
Send[7]: duration=0
[2016-09-02 22:36:47,154] DEBUG Initiating connection to node 1 at HOST-1:9092. 
[2016-09-02 22:36:47,156] DEBUG Node 1 disconnected. 
Send[8]: duration=0
Produce[8]: duration=5, exception=null
[2016-09-02 22:36:48,159] DEBUG Initiating connection to node 1 at HOST-1:9092. 
[2016-09-02 22:36:48,161] DEBUG Node 1 disconnected. 
Send[9]: duration=0
Produce[9]: duration=3, exception=null
[2016-09-02 22:36:49,165] DEBUG Initiating connection to node 1 at HOST-1:9092. 
[2016-09-02 22:36:49,168] DEBUG Node 1 disconnected. 

# *3 The second metadata update exactly after 10 seconds since the first update.
[2016-09-02 22:36:49,914] DEBUG Sending metadata request {topics=[test]} to 
node 3 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:49,918] DEBUG Updated cluster metadata version 3 to 
Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-3:9092 (id: 3 rack: 
null)], partitions = [Partition(topic = test, partition = 1, leader = 2, 
replicas = [1,2,3,], isr = [2,3,]), Partition(topic = test, partition = 0, 
leader = 3, replicas = [1,2,3,], isr = [3,2,]), Partition(topic = test, 
partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,])]) 
Produce[4]: duration=5957, exception=null
Produce[7]: duration=2946, exception=null
Send[10]: duration=0
Produce[10]: duration=4, exception=null

First, as I explained already, the first send() blocked insanely long due to 
not intentionally applied refreshBackoffMs (*1).
Then I stopped broker 1 at (*2). I think what we expect here is that 
KafkaProducer immediately tries to update metadata in order to failover 
producing target to the new leader, but it doesn't until 10 
seconds(=retry.backoff.ms) elapsed since the first update at (*3).

This leads following bad effects:
- Producing latency
- Buffer full due to accumulated records
- Batch expiration by elapsing {{request.timeout.ms}} : 

