[
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458699#comment-15458699
]
Yuto Kawamura commented on KAFKA-4024:
--------------------------------------
Updated PR to fix this issue not only about the first metadata update but also
for cases I just explained in the above comment.
After applying my patch, the first send() is no longer blocked by the first
metadata update (*4), and the second metadata update happens immediately after
the KafkaProducer detects broker disconnection (*5, *6).
{code}
Experimenting with retry.backoff.ms = 10000
...
[2016-09-02 22:48:22,936] INFO Kafka version : 0.10.1.0-SNAPSHOT
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:48:22,936] INFO Kafka commitId : 8f3462552fa4d6a6
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:48:22,939] DEBUG Initialize connection to node -2 for sending
metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:22,939] DEBUG Initiating connection to node -2 at
HOST-2:9092. (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:23,001] DEBUG Completed connection to node -2
(org.apache.kafka.clients.NetworkClient)
# *4 The first metadata update happenes immediately.
[2016-09-02 22:48:23,020] DEBUG Sending metadata request {topics=[test]} to
node -2 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:23,043] DEBUG Updated cluster metadata version 2 to
Cluster(nodes = [HOST-1:9092 (id: 1 rack: null), HOST-2:9092 (id: 2 rack:
null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test,
partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]),
Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr =
[3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas =
[1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
Send[0]: duration=119
[2016-09-02 22:48:23,057] DEBUG Initiating connection to node 3 at HOST-3:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:23,060] DEBUG Completed connection to node 3
(org.apache.kafka.clients.NetworkClient)
Produce[0]: duration=129, exception=null
Send[1]: duration=0
[2016-09-02 22:48:24,060] DEBUG Initiating connection to node 1 at HOST-1:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:24,062] DEBUG Completed connection to node 1
(org.apache.kafka.clients.NetworkClient)
Produce[1]: duration=10, exception=null
Send[2]: duration=0
[2016-09-02 22:48:25,066] DEBUG Initiating connection to node 2 at HOST-2:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:25,068] DEBUG Completed connection to node 2
(org.apache.kafka.clients.NetworkClient)
Produce[2]: duration=6, exception=null
Send[3]: duration=0
Produce[3]: duration=4, exception=null
# *5 I stopped broker 1 at this moment
[2016-09-02 22:48:26,301] DEBUG Node 1 disconnected.
(org.apache.kafka.clients.NetworkClient)
# *6 Metadata updated immediately after the producer detects broker
disconnection
[2016-09-02 22:48:26,301] DEBUG Sending metadata request {topics=[test]} to
node 2 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:26,308] DEBUG Updated cluster metadata version 3 to
Cluster(nodes = [HOST-3:9092 (id: 3 rack: null), HOST-1:9092 (id: 1 rack:
null), HOST-2:9092 (id: 2 rack: null)], partitions = [Partition(topic = test,
partition = 1, leader = 2, replicas = [1,2,3,], isr = [2,3,]), Partition(topic
= test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,]),
Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr =
[3,2,])]) (org.apache.kafka.clients.Metadata)
Send[4]: duration=0
Produce[4]: duration=4, exception=null
{code}
> First metadata update always take retry.backoff.ms milliseconds to complete
> ---------------------------------------------------------------------------
>
> Key: KAFKA-4024
> URL: https://issues.apache.org/jira/browse/KAFKA-4024
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.9.0.1, 0.10.0.0
> Reporter: Yuto Kawamura
> Assignee: Yuto Kawamura
>
> Recently I updated our KafkaProducer configuration, specifically we adjusted
> {{retry.backoff.ms}} from default(100ms) to 1000ms.
> After that we observed that the first {{send()}} start taking longer than
> before, investigated then found following facts.
> Environment:
> - Kafka broker 0.9.0.1
> - Kafka producer 0.9.0.1
> Our current version is 0.9.0.1 but it reproduced with latest build from trunk
> branch as well.
> h2. TL;DR
> The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}}
> milliseconds, due to unintentionally applied backoff on first metadata update.
> h2. Proof
> I wrote following test code and placed under the clients/main/java/
> {code}
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public final class KafkaProducerMetadataUpdateDurationTest {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
> String retryBackoffMs = System.getProperty("retry.backoff.ms");
> System.err.println("Experimenting with retry.backoff.ms = " +
> retryBackoffMs);
> props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
> retryBackoffMs);
> Producer<byte[], byte[]> producer =
> new KafkaProducer<>(props, new ByteArraySerializer(), new
> ByteArraySerializer());
> long t0 = System.nanoTime();
> try {
> producer.partitionsFor("test");
> long duration = System.nanoTime() - t0;
> System.err.println("Duration = " +
> TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
> } finally {
> producer.close();
> }
> }
> }
> {code}
> Here's experiment log:
> {code}
> # Start zookeeper & kafka broker
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> # Create test topic
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test
> --replication-factor 1 --partitions 1
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 100
> Duration = 175 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 1000
> Duration = 1066 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 10000
> Duration = 10070 ms
> {code}
> As you can see, duration of {{partitionsFor()}} increases linearly in
> proportion to the value of {{retry.backoff.ms}}.
> Here I describe the scenario that leads this behavior:
> 1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and
> the current timestamp:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
> 2. On the first {{send()}}, KafkaProducer requests metadata update due to
> missing partition info:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
> 3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because
> {{metadata.timeToNextUpdate}} returns a value lager than zero:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
> 4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata
> expiration or time till backing off expiration but practially needUpdate is
> always true at the first time so here the timeToAllowUpdate is always
> adopted, which never be zero until {{retry.backoff.ms}} elapsed since the
> first {{metadata.update()}}:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116
> This is because of kafka client tries to keep interval configured by
> {{retry.backoff.ms}} between each metadata update so it's basically works
> fine from the second update but for the first time, since it could never have
> the actual metadata(which is obtained by MetadaUpdate request), this backing
> off isn't making sense and in fact it's harming our application by blocking
> the first {{send()}} insanely long.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)