[ https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458639#comment-15458639 ]
Yuto Kawamura commented on KAFKA-4024: -------------------------------------- I reconsidered this issue and think I found that this is much worse than I explained before. IIUC, in short, setting {{retry.backoff.ms}} to lager value can delays KafkaProducer to update outdated metadata. That is, when we set {{retry.backoff.ms}} to 1 second for example, and a partition leadership failover happens, the producer will take 1 seconds to fire metadata request in the worst case, even though it could detect broker disconnection or outdated partition leadership information. Here's the result of my experiment. I modified {{KafkaProducerMetadataUpdateDurationTest}} and observed DEBUG logs of NetworkClient and Metadata. clients/src/main/java/org/apache/kafka/clients/Metadata.java: {code} import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; public final class KafkaProducerMetadataUpdateDurationTest { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "HOST-1:9092,HOST-2:9092,HOST-3:9092"); props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000"); props.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE)); String retryBackoffMs = System.getProperty("retry.backoff.ms"); System.err.println("Experimenting with retry.backoff.ms = " + retryBackoffMs); props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); try { int i = 0; while (true) { final int produceSeq = i++; final long t0 = System.nanoTime(); producer.send(new ProducerRecord<>("test", produceSeq % 3, "key", "value"), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long produceDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0); System.err.printf("Produce[%d]: duration=%d, exception=%s\n", produceSeq, produceDuration, exception); } }); long sendDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0); System.err.printf("Send[%d]: duration=%d\n", produceSeq, sendDuration); Thread.sleep(1000); } } finally { producer.close(); } } } {code} log4j.properties: {code} log4j.rootLogger=INFO, stdout log4j.logger.org.apache.kafka.clients.Metadata=DEBUG, stdout log4j.additivity.org.apache.kafka.clients.Metadata=false log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG, stdout log4j.additivity.org.apache.kafka.clients.NetworkClient=false log4j.logger.org.apache.kafka.clients.producer.internals.Sender=DEBUG, stdout log4j.additivity.org.apache.kafka.clients.producer.internals.Sender=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n {code} Topic "test" has 3 replicas and 3 partitions. Then I started KafkaProducerMetadataUpdateDurationTest, and stopped broker 1 manually at (*2). Here's the log: {code} ./bin/kafka-run-class.sh -Dlog4j.configuration=file:./log4j.properties -Dretry.backoff.ms=10000 KafkaProducerMetadataUpdateDurationTest Experimenting with retry.backoff.ms = 10000 ... [2016-09-02 22:36:29,839] INFO Kafka version : 0.10.1.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser) [2016-09-02 22:36:29,839] INFO Kafka commitId : 8f3462552fa4d6a6 (org.apache.kafka.common.utils.AppInfoParser) [2016-09-02 22:36:39,826] DEBUG Initialize connection to node -2 for sending metadata request (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:39,826] DEBUG Initiating connection to node -2 at HOST-2:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:39,883] DEBUG Completed connection to node -2 (org.apache.kafka.clients.NetworkClient) # *1 The first metadata request [2016-09-02 22:36:39,902] DEBUG Sending metadata request {topics=[test]} to node -2 (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:39,929] DEBUG Updated cluster metadata version 2 to Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-1:9092 (id: 1 rack: null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata) Send[0]: duration=10104 [2016-09-02 22:36:39,944] DEBUG Initiating connection to node 3 at HOST-3:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:39,947] DEBUG Completed connection to node 3 (org.apache.kafka.clients.NetworkClient) Produce[0]: duration=10117, exception=null Send[1]: duration=0 [2016-09-02 22:36:40,950] DEBUG Initiating connection to node 1 at HOST-1:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:40,952] DEBUG Completed connection to node 1 (org.apache.kafka.clients.NetworkClient) Produce[1]: duration=12, exception=null Send[2]: duration=0 [2016-09-02 22:36:41,955] DEBUG Initiating connection to node 2 at HOST-2:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:41,958] DEBUG Completed connection to node 2 (org.apache.kafka.clients.NetworkClient) Produce[2]: duration=5, exception=null Send[3]: duration=0 Produce[3]: duration=4, exception=null # *2 I stopped broker 1 at this moment [2016-09-02 22:36:43,134] DEBUG Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) Send[4]: duration=0 [2016-09-02 22:36:44,137] DEBUG Initiating connection to node 1 at HOST-1:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:44,139] DEBUG Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) Send[5]: duration=0 Produce[5]: duration=4, exception=null [2016-09-02 22:36:45,141] DEBUG Initiating connection to node 1 at HOST-1:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:45,143] DEBUG Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) Send[6]: duration=0 Produce[6]: duration=3, exception=null [2016-09-02 22:36:46,148] DEBUG Initiating connection to node 1 at HOST-1:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:46,150] DEBUG Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) Send[7]: duration=0 [2016-09-02 22:36:47,154] DEBUG Initiating connection to node 1 at HOST-1:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:47,156] DEBUG Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) Send[8]: duration=0 Produce[8]: duration=5, exception=null [2016-09-02 22:36:48,159] DEBUG Initiating connection to node 1 at HOST-1:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:48,161] DEBUG Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) Send[9]: duration=0 Produce[9]: duration=3, exception=null [2016-09-02 22:36:49,165] DEBUG Initiating connection to node 1 at HOST-1:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:49,168] DEBUG Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) # *3 The second metadata update exactly after 10 seconds since the first update. [2016-09-02 22:36:49,914] DEBUG Sending metadata request {topics=[test]} to node 3 (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:36:49,918] DEBUG Updated cluster metadata version 3 to Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test, partition = 1, leader = 2, replicas = [1,2,3,], isr = [2,3,]), Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,]), Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,])]) (org.apache.kafka.clients.Metadata) Produce[4]: duration=5957, exception=null Produce[7]: duration=2946, exception=null Send[10]: duration=0 Produce[10]: duration=4, exception=null {code} First, as I explained already, the first send() blocked insanely long due to not intentionally applied refreshBackoffMs (*1). Then I stopped broker 1 at (*2). I think what we expect here is that KafkaProducer immediately tries to update metadata in order to failover producing target to the new leader, but it doesn't until 10 seconds(=retry.backoff.ms) elapsed since the first update at (*3). This leads following bad effects: - Producing latency - Buffer full due to accumulated records - Batch expiration by elapsing {{request.timeout.ms}} : https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java#L153-L156 > First metadata update always take retry.backoff.ms milliseconds to complete > --------------------------------------------------------------------------- > > Key: KAFKA-4024 > URL: https://issues.apache.org/jira/browse/KAFKA-4024 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.9.0.1, 0.10.0.0 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > > Recently I updated our KafkaProducer configuration, specifically we adjusted > {{retry.backoff.ms}} from default(100ms) to 1000ms. > After that we observed that the first {{send()}} start taking longer than > before, investigated then found following facts. > Environment: > - Kafka broker 0.9.0.1 > - Kafka producer 0.9.0.1 > Our current version is 0.9.0.1 but it reproduced with latest build from trunk > branch as well. > h2. TL;DR > The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} > milliseconds, due to unintentionally applied backoff on first metadata update. > h2. Proof > I wrote following test code and placed under the clients/main/java/ > {code} > import java.util.Properties; > import java.util.concurrent.TimeUnit; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.common.serialization.ByteArraySerializer; > public final class KafkaProducerMetadataUpdateDurationTest { > public static void main(String[] args) { > Properties props = new Properties(); > props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000"); > String retryBackoffMs = System.getProperty("retry.backoff.ms"); > System.err.println("Experimenting with retry.backoff.ms = " + > retryBackoffMs); > props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, > retryBackoffMs); > Producer<byte[], byte[]> producer = > new KafkaProducer<>(props, new ByteArraySerializer(), new > ByteArraySerializer()); > long t0 = System.nanoTime(); > try { > producer.partitionsFor("test"); > long duration = System.nanoTime() - t0; > System.err.println("Duration = " + > TimeUnit.NANOSECONDS.toMillis(duration) + " ms"); > } finally { > producer.close(); > } > } > } > {code} > Here's experiment log: > {code} > # Start zookeeper & kafka broker > ./bin/zookeeper-server-start.sh config/zookeeper.properties > ./bin/kafka-server-start.sh config/server.properties > # Create test topic > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --replication-factor 1 --partitions 1 > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 100 > Duration = 175 ms > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 1000 > Duration = 1066 ms > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 10000 > Duration = 10070 ms > {code} > As you can see, duration of {{partitionsFor()}} increases linearly in > proportion to the value of {{retry.backoff.ms}}. > Here I describe the scenario that leads this behavior: > 1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and > the current timestamp: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276 > 2. On the first {{send()}}, KafkaProducer requests metadata update due to > missing partition info: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527 > 3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because > {{metadata.timeToNextUpdate}} returns a value lager than zero: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548 > 4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata > expiration or time till backing off expiration but practially needUpdate is > always true at the first time so here the timeToAllowUpdate is always > adopted, which never be zero until {{retry.backoff.ms}} elapsed since the > first {{metadata.update()}}: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116 > This is because of kafka client tries to keep interval configured by > {{retry.backoff.ms}} between each metadata update so it's basically works > fine from the second update but for the first time, since it could never have > the actual metadata(which is obtained by MetadaUpdate request), this backing > off isn't making sense and in fact it's harming our application by blocking > the first {{send()}} insanely long. -- This message was sent by Atlassian JIRA (v6.3.4#6332)