[
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223755#comment-14223755
]
Ewen Cheslack-Postava commented on KAFKA-1642:
----------------------------------------------
Ok, so as I suspected, you need to wait awhile before the issue shows up. It
looks to me like this is due to a metadata refresh. This causes metadataTimeout
in Client.poll() to be 0, but then maybeUpdateMetadata is unable to make any
progress since it can't connect to any nodes. The previous patch fixed issues
that caused the timeout parameter to that method to be 0, so this is a similar
issue. However, under normal testing it won't always show up immediately -- you
need to wait until the next metadata refresh, which is currently every 5
minutes.
I need to think more about the details of the fix. That timeout shouldn't
consistently be 0 if we're just trying to refresh metadata, but we need to make
sure we select an appropriate timeout for each case. Looking through
maybeUpdateMetadata there are a few different possibilities:
1. leastLoadedNode returns null, leaving no nodes available and we don't even
try to refresh
2. The selected node is connected and we can send more data - we mark
metadataFetchInProgress to avoid resending requests, but should probably also
use that to determine the timeout on poll()
3. The selected node is connected but we can't send more data yet
4. The selected node is not connected, but we are allowed to try to initiate a
connection based on the reconnection backoff.
4a. Trying to initiate the connection may return an immediate error
4b. Or we'll need to wait for the connection event.
5. The selected node is not connected and we aren't allowed to initiate a new
connection yet.
Given that all these conditions are based on the code in maybeUpdateMetadata
(and initiateConnect, which it calls), it probably makes sense to have that
code return an appropriate timeout to be used in poll(). But we need to make
sure the selected values are also combined correctly with the timeout passed
into poll() and that any wakeups before that time also subsequently produce
correct values.
The logic in the Sender.run() and NetworkClient.poll() are complex and need to
handle a lot of different cases, but it should be possible to fix this problem
only by adjusting that code without adding retries/backoff further up the
stack. The core of this problem is just that that loop is selecting too small a
timeout.
> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network
> connection is lost
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.8.2
> Reporter: Bhavesh Mistry
> Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments:
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch,
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch,
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while. It
> seems network IO thread are very busy logging following error message. Is
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka
> producer I/O thread:
> java.lang.IllegalStateException: No entry found for node -2
> at
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)