[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14257686#comment-14257686
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:41 PM:
------------------------------------------------------------------

[~ewencp],

Patch indeed solve the high CPU Problem reported by this bug.  I have tested 
all brokers down, one broker down and two broker down (except for last use 
cases where one of the brokers runs out of Socket File Descriptor a rear case) 
:  I am sorry for last response, I got busy with other stuff so testing got 
delayed.

Here are some interesting Observations from YourKit:

0)  Overall, patch has also brought down  overall consumption in Normal Healthy 
or Happy case where every thing is up and running.  In old code (without 
patch), I use to see about 10% of overall CPU used by process by io threads (4 
in my case), it has reduce to 5% or less now with path.   

1)      When two brokers are down, then occasionally I see IO thread blocked. ( 
I did not see this when one brokers is down) 

{code}
kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON]
org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70
java.lang.Thread.run() Thread.java:744
{code}

2)      record-error-rate metric remain zero despite following firewall rule.  
In my opinion, it should have called  
org.apache.kafka.clients.producer.Callback  but I did not see that happening 
either in either one or two brokers down.  Should I file another issue for this 
? Please confirm.

{code}
00100 reject tcp from me to b1.ip dst-port 9092
00200 reject tcp from me to b2.ip dst-port 9092
{code}

{code}
        class LoggingCallBaHandler implements Callback {

                /**
                 * A callback method the user can implement to provide 
asynchronous
                 * handling of request completion. This method will be called 
when the
                 * record sent to the server has been acknowledged. Exactly one 
of the
                 * arguments will be non-null.
                 * 
                 * @param metadata
                 *            The metadata for the record that was sent (i.e. 
the
                 *            partition and offset). Null if an error occurred.
                 * @param exception
                 *            The exception thrown during processing of this 
record.
                 *            Null if no error occurred.
                 */
                @Override
                public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                        if(exception != null){
                                exception.printStackTrace();
                        }
                }
        }
{code}

I do not see any exception at all on console....not sure why ?

3)      Application does NOT gracefully shutdown when there one or more brokers 
are down. (io Thread never exits this is know issue ) 

{code}
"SIGTERM handler" daemon prio=5 tid=0x00007f8bd79e4000 nid=0x17907 waiting for 
monitor entry [0x000000011e906000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bd5159000 nid=0x1cb0b waiting for 
monitor entry [0x000000011e803000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdd147800 nid=0x15d0b waiting for 
monitor entry [0x000000011e30a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdf820000 nid=0x770b waiting for 
monitor entry [0x000000011e207000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdc393800 nid=0x1c30f waiting for 
monitor entry [0x000000011e104000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"Thread-4" prio=5 tid=0x00007f8bdb39f000 nid=0xa107 in Object.wait() 
[0x000000011ea89000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.$$YJP$$wait(Native Method)
        at java.lang.Object.wait(Object.java)
        at java.lang.Thread.join(Thread.java:1280)
        - locked <0x0000000705c2f650> (a 
org.apache.kafka.common.utils.KafkaThread)
        at java.lang.Thread.join(Thread.java:1354)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:322)
        at 

"kafka-producer-network-thread | error" daemon prio=5 tid=0x00007f8bd814e000 
nid=0x7403 runnable [0x000000011e6c0000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.KQueueArrayWrapper.$$YJP$$kevent0(Native Method)
        at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java)
        at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:200)
        at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
        - locked <0x0000000705c109f8> (a sun.nio.ch.Util$2)
        - locked <0x0000000705c109e8> (a java.util.Collections$UnmodifiableSet)
        - locked <0x0000000705c105c8> (a sun.nio.ch.KQueueSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
        at org.apache.kafka.common.network.Selector.select(Selector.java:322)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:212)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:744)
{code}

Thank you for the patch fix.

Thanks,
Bhavesh


was (Author: bmis13):
[~ewencp],

Patch indeed solve the high CPU Problem reported by this bug.  I have tested 
all brokers down, one broker down and two broker down (except for last use 
cases where one of the brokers runs out of Socket File Descriptor a rear case) 
:  I am sorry for last response, I got busy with other stuff so testing got 
delayed.

Here are some interesting Observations from YourKit:

0)  Overall, patch has also brought down  overall consumption in Normal Healthy 
or Happy case where every thing is up and running.  In old code (without 
patch), I use to see about 10% of overall CPU used by process by io threads (4 
in my case), it has reduce to 5% or less now with path.   

1)      When two brokers are down, then occasionally I see IO thread blocked. ( 
I did not see this when one brokers is down) 

{code}
kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON]
org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70
java.lang.Thread.run() Thread.java:744
{code}

2)      record-error-rate metric remain zero despite following firewall rule.  
In my opinion, it should have called  
org.apache.kafka.clients.producer.Callback  but I did not see that happening 
either in either one or two brokers down.  Should I file another issue for this 
? Please confirm.

{code}
00100 reject tcp from me to b1.ip dst-port 9092
00200 reject tcp from me to b2.ip dst-port 9092
{code}

{code}
        class LoggingCallBaHandler implements Callback {

                /**
                 * A callback method the user can implement to provide 
asynchronous
                 * handling of request completion. This method will be called 
when the
                 * record sent to the server has been acknowledged. Exactly one 
of the
                 * arguments will be non-null.
                 * 
                 * @param metadata
                 *            The metadata for the record that was sent (i.e. 
the
                 *            partition and offset). Null if an error occurred.
                 * @param exception
                 *            The exception thrown during processing of this 
record.
                 *            Null if no error occurred.
                 */
                @Override
                public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                        if(exception != null){
                                exception.printStackTrace();
                        }
                }
        }
{code}

I do not see any exception at all on console....not sure why ?

3)      Application does NOT gracefully shutdown when there one or more brokers 
are down. (io Thread never exits this is know issue ) 

{code}
"SIGTERM handler" daemon prio=5 tid=0x00007f8bd79e4000 nid=0x17907 waiting for 
monitor entry [0x000000011e906000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bd5159000 nid=0x1cb0b waiting for 
monitor entry [0x000000011e803000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdd147800 nid=0x15d0b waiting for 
monitor entry [0x000000011e30a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdf820000 nid=0x770b waiting for 
monitor entry [0x000000011e207000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"SIGTERM handler" daemon prio=5 tid=0x00007f8bdc393800 nid=0x1c30f waiting for 
monitor entry [0x000000011e104000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.lang.Shutdown.exit(Shutdown.java:212)
        - waiting to lock <0x000000070008f7c0> (a java.lang.Class for 
java.lang.Shutdown)
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:744)

"Thread-4" prio=5 tid=0x00007f8bdb39f000 nid=0xa107 in Object.wait() 
[0x000000011ea89000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.$$YJP$$wait(Native Method)
        at java.lang.Object.wait(Object.java)
        at java.lang.Thread.join(Thread.java:1280)
        - locked <0x0000000705c2f650> (a 
org.apache.kafka.common.utils.KafkaThread)
        at java.lang.Thread.join(Thread.java:1354)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:322)
        at 

"kafka-producer-network-thread | error" daemon prio=5 tid=0x00007f8bd814e000 
nid=0x7403 runnable [0x000000011e6c0000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.KQueueArrayWrapper.$$YJP$$kevent0(Native Method)
        at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java)
        at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:200)
        at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
        - locked <0x0000000705c109f8> (a sun.nio.ch.Util$2)
        - locked <0x0000000705c109e8> (a java.util.Collections$UnmodifiableSet)
        - locked <0x0000000705c105c8> (a sun.nio.ch.KQueueSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
        at org.apache.kafka.common.network.Selector.select(Selector.java:322)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:212)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:744)
{code}

Thank you for the patch fix.

Thanks,
Bhavesh

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1642
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1642
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.2
>            Reporter: Bhavesh Mistry
>            Assignee: Ewen Cheslack-Postava
>            Priority: Blocker
>             Fix For: 0.8.2
>
>         Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to