[
https://issues.apache.org/jira/browse/KAFKA-3358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15239013#comment-15239013
]
Ismael Juma commented on KAFKA-3358:
------------------------------------
Phil Luckhurst mentioned in the mailing list that an additional aspect is that
we send metadata requests quite frequently if the producer is started, but not
used:
{noformat}
With debug logging turned on we've sometimes seen our logs filling up with the
kafka producer sending metadata requests
every 100ms e.g.
2016-04-08 10:39:33,592 DEBUG [kafka-producer-network-thread | phil-pa-1]
org.apache.kafka.clients.NetworkClient: Sending metadata request
ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=249,client_id=phil-pa-1},
body={topics=[phil-pa-1-device-update]}), isInitiatedByNetworkClient,
createdTimeMs=1460108373592, sendTimeMs=0) to node 0
2016-04-08 10:39:33,592 DEBUG [kafka-producer-network-thread | phil-pa-1]
org.apache.kafka.clients.Metadata: Updated cluster metadata version 248 to
Cluster(nodes = [Node(0, ta-eng-kafka2, 9092)], partitions = [Partition(topic =
phil-pa-1-device-update, partition = 0, leader = 0, replicas = [0,], isr =
[0,]])
2016-04-08 10:39:33,698 DEBUG [kafka-producer-network-thread | phil-pa-1]
org.apache.kafka.clients.NetworkClient: Sending metadata request
ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=250,client_id=phil-pa-1},
body={topics=[phil-pa-1-device-update]}), isInitiatedByNetworkClient,
createdTimeMs=1460108373698, sendTimeMs=0) to node 0
2016-04-08 10:39:33,698 DEBUG [kafka-producer-network-thread | phil-pa-1]
org.apache.kafka.clients.Metadata: Updated cluster metadata version 249 to
Cluster(nodes = [Node(0, ta-eng-kafka2, 9092)], partitions = [Partition(topic =
phil-pa-1-device-update, partition = 0, leader = 0, replicas = [0,], isr =
[0,]])
These metadata requests continue to be sent every 100ms (retry.backoff.ms)
until we stop the process.
This only seems to happen if the KafkaProducer instance is created but not used
to publish a message for 5 minutes. After 5
minutes (metadata.max.age.ms) the producer thread sends a metadata request to
the server that has an empty topics list and
the server responds with the partition information for *all* topics hosted on
the server.
2016-04-11 14:16:39,320 DEBUG [kafka-producer-network-thread | phil-pa-1]
org.apache.kafka.clients.NetworkClient: Sending metadata request
ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=phil-pa-1},
body={topics=[]}), isInitiatedByNetworkClient, createdTimeMs=1460380599289,
sendTimeMs=0) to node -1
If we then use that KafkaProducer instance to send a message the next 'Sending
meta request' will just be for the topic we have
sent the message to and this then triggers the flood of retry requests as noted
above.
If we ensure we send the first message within the time set by
metadata.max.age.ms (default 5 minutes) then everything works as
expected and the metadata requests do not continually get retried.
In many cases I can understand that creating a KafkaProducer and then not using
it within 5 minutes is not usual but in our case
we're creating it when our REST based application starts up and we can't
guarantee that a message will be published within that
time. To get around this we are currently posting a test message to the topic
right after creating the KafkaProducer prevents it
happening.
{noformat}
Phil investigated some more and said:
{noformat}
The request does succeed and the reason it keeps requesting is a check in the
Sender.run(long now) method.
public void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result =
this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force
metadata update
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
It looks like the this.accumulator.ready(cluster, now) method checks the leader
for each partition in the response against what it
already had. In this case the original metadata request had the empty topic
list so got information for all partitions but after
using the producer the cluster only has the one topic in it which means this
check sets unknownLeadersExist = true.
Node leader = cluster.leaderFor(part);
if (leader == null) {
unknownLeadersExist = true;
As you can see above the Sender.run method checks for this in the result and
then calls this.metadata.requestUpdate() which
triggers the metadata to be requested again. And of course the same thing
happens when checking the next response and
we're suddenly in the loop forever.
{noformat}
Supposedly, if we only update the cluster metadata and don't ask for any
topics, this may also resolve itself, but it needs to be verified.
> Only request metadata updates once we have topics or a pattern subscription
> ---------------------------------------------------------------------------
>
> Key: KAFKA-3358
> URL: https://issues.apache.org/jira/browse/KAFKA-3358
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Affects Versions: 0.9.0.0, 0.9.0.1
> Reporter: Ismael Juma
> Assignee: Jason Gustafson
> Priority: Critical
> Fix For: 0.10.1.0
>
>
> The current code requests a metadata update for _all_ topics which can cause
> major load issues in large clusters.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)