[ 
https://issues.apache.org/jira/browse/KAFKA-3358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15239013#comment-15239013
 ] 

Ismael Juma commented on KAFKA-3358:
------------------------------------

Phil Luckhurst mentioned in the mailing list that an additional aspect is that 
we send metadata requests quite frequently if the producer is started, but not 
used:

{noformat}
With debug logging turned on we've sometimes seen our logs filling up with the 
kafka producer sending metadata requests
every 100ms e.g.

2016-04-08 10:39:33,592 DEBUG [kafka-producer-network-thread | phil-pa-1] 
org.apache.kafka.clients.NetworkClient: Sending metadata request 
ClientRequest(expectResponse=true, callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=249,client_id=phil-pa-1},
 body={topics=[phil-pa-1-device-update]}), isInitiatedByNetworkClient, 
createdTimeMs=1460108373592, sendTimeMs=0) to node 0
2016-04-08 10:39:33,592 DEBUG [kafka-producer-network-thread | phil-pa-1] 
org.apache.kafka.clients.Metadata: Updated cluster metadata version 248 to 
Cluster(nodes = [Node(0, ta-eng-kafka2, 9092)], partitions = [Partition(topic = 
phil-pa-1-device-update, partition = 0, leader = 0, replicas = [0,], isr = 
[0,]])
2016-04-08 10:39:33,698 DEBUG [kafka-producer-network-thread | phil-pa-1] 
org.apache.kafka.clients.NetworkClient: Sending metadata request 
ClientRequest(expectResponse=true, callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=250,client_id=phil-pa-1},
 body={topics=[phil-pa-1-device-update]}), isInitiatedByNetworkClient, 
createdTimeMs=1460108373698, sendTimeMs=0) to node 0
2016-04-08 10:39:33,698 DEBUG [kafka-producer-network-thread | phil-pa-1] 
org.apache.kafka.clients.Metadata: Updated cluster metadata version 249 to 
Cluster(nodes = [Node(0, ta-eng-kafka2, 9092)], partitions = [Partition(topic = 
phil-pa-1-device-update, partition = 0, leader = 0, replicas = [0,], isr = 
[0,]])

These metadata requests continue to be sent every 100ms (retry.backoff.ms) 
until we stop the process.

This only seems to happen if the KafkaProducer instance is created but not used 
to publish a message for 5 minutes. After 5
minutes (metadata.max.age.ms) the producer thread sends a metadata request to 
the server that has an empty topics list and
the server responds with the partition information for *all* topics hosted on 
the server.

2016-04-11 14:16:39,320 DEBUG [kafka-producer-network-thread | phil-pa-1] 
org.apache.kafka.clients.NetworkClient: Sending metadata request 
ClientRequest(expectResponse=true, callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=phil-pa-1},
 body={topics=[]}), isInitiatedByNetworkClient, createdTimeMs=1460380599289, 
sendTimeMs=0) to node -1

If we then use that KafkaProducer instance to send a message the next 'Sending 
meta request' will just be for the topic we have
sent the message to and this then triggers the flood of retry requests as noted 
above.

If we ensure we send the first message within the time set by 
metadata.max.age.ms (default 5 minutes) then everything works as
expected and the metadata requests do not continually get retried.

In many cases I can understand that creating a KafkaProducer and then not using 
it within 5 minutes is not usual but in our case
we're creating it when our REST based application starts up and we can't 
guarantee that a message will be published within that
time. To get around this we are currently posting a test message to the topic 
right after creating the KafkaProducer prevents it
happening.
{noformat}

Phil investigated some more and said:

{noformat}
The request does succeed and the reason it keeps requesting is a check in the 
Sender.run(long now) method.

    public void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = 
this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force 
metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

It looks like the this.accumulator.ready(cluster, now) method checks the leader 
for each partition in the response against what it
already had. In this case the original metadata request had the empty topic 
list so got information for all partitions but after
using the producer the cluster only has the one topic in it which means this 
check sets unknownLeadersExist = true.
            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                unknownLeadersExist = true;

As you can see above the Sender.run method checks for this in the result and 
then calls this.metadata.requestUpdate() which
triggers the metadata to be requested again. And of course the same thing 
happens when checking the next response and
we're suddenly in the loop forever.
{noformat}

Supposedly, if we only update the cluster metadata and don't ask for any 
topics, this may also resolve itself, but it needs to be verified.

> Only request metadata updates once we have topics or a pattern subscription
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-3358
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3358
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.9.0.0, 0.9.0.1
>            Reporter: Ismael Juma
>            Assignee: Jason Gustafson
>            Priority: Critical
>             Fix For: 0.10.1.0
>
>
> The current code requests a metadata update for _all_ topics which can cause 
> major load issues in large clusters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to