[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15136071#comment-15136071
 ] 

Jason Gustafson commented on KAFKA-3159:
----------------------------------------

Either should work, but perhaps it would be most useful at the moment to try 
against 0.9.0.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-3159
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3159
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.0
>         Environment: Linux, Oracle JVM 8.
>            Reporter: Rajiv Kurian
>            Assignee: Jason Gustafson
>             Fix For: 0.9.0.1
>
>         Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
>         metadata.max.age.ms = 300000
>         value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>         group.id = myGroup.id
>         partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
>         reconnect.backoff.ms = 50
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         max.partition.fetch.bytes = 2097152
>         bootstrap.servers = [myBrokerList]
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         ssl.keystore.type = JKS
>         ssl.trustmanager.algorithm = PKIX
>         enable.auto.commit = false
>         ssl.key.password = null
>         fetch.max.wait.ms = 1000
>         sasl.kerberos.min.time.before.relogin = 60000
>         connections.max.idle.ms = 540000
>         ssl.truststore.password = null
>         session.timeout.ms = 30000
>         metrics.num.samples = 2
>         client.id = 
>         ssl.endpoint.identification.algorithm = null
>         key.deserializer = class sf.kafka.VoidDeserializer
>         ssl.protocol = TLS
>         check.crcs = true
>         request.timeout.ms = 40000
>         ssl.provider = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.keystore.location = null
>         heartbeat.interval.ms = 3000
>         auto.commit.interval.ms = 5000
>         receive.buffer.bytes = 32768
>         ssl.cipher.suites = null
>         ssl.truststore.type = JKS
>         security.protocol = PLAINTEXT
>         ssl.truststore.location = null
>         ssl.keystore.password = null
>         ssl.keymanager.algorithm = SunX509
>         metrics.sample.window.ms = 30000
>         fetch.min.bytes = 512
>         send.buffer.bytes = 131072
>         auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords<Void, byte[]> records = null;
>         try {
>             // Here timeout is about 10 seconds, so it is pretty big.
>             records = consumer.poll(timeout);
>         } catch (Exception e) {
>            // This never hits for us
>             logger.error("Exception polling Kafka ", e);
>             records = null;
>         }
>         if (records != null) {
>             for (ConsumerRecord<Void, byte[]> record : records) {
>                // The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
>                 handler.handleMessage(ByteBuffer.wrap(record.value()));
>             }
>         }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shade the 
> kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
> process CPU is used on polling these 64 partitions (across 3 brokers) with 
> single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
> (100 seconds odd) and that doesn't seem to make much of a difference either.
> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
> whether this is expected but this seems like it would completely kill 
> performance. Here is the exception tab of Java mission control. 
> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
> which is about 10 thousand exceptions per second! The exception stack trace 
> shows that it originates from the poll call. I don't understand how it can 
> throw so many exceptions given I call poll it with a timeout of 10 seconds 
> and get a single digit number of messages per second. The exception seems to 
> be thrown from here: 
> https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236
> 3. The single thread seems to allocate a lot too. The single thread is 
> responsible for 17.87% of our entire JVM allocation rate. During other runs 
> it has gone up to 20% of our entire JVM allocation rate. Most of what it 
> allocates seems to be those same EOFExceptions. Here is a chart showing the 
> single thread's allocation proportion: http://imgur.com/GNUJQsz Here is a 
> chart that shows a breakdown of the allocations: http://imgur.com/YjCXljE 
> About 20% of the allocations are for the EOFExceptions. But given that the 
> 20% of the allocations (exceptions) is around 10k/second, the thread itself 
> is allocating about 50k objects/second which seems excessive given how we are 
> getting very few messages.
> As a comparison, we also run a wrapper over the old SimpleConsumer that gets 
> a lot more data (30 thousand 70 byte messages/sec on a different topic) and 
> it is able to handle that load without much trouble. At this moment we are 
> completely puzzled by this performance. At least some part of that seems to 
> be due to the crazy volumes of exceptions but the CPU profiling breakdown 
> seems to suggest that there are plenty of other causes including the 
> Fetcher.initFetches() call and the ConsumerNetworkClient.poll() call. Note: 
> Our messages seem to all be making through. We haven't measured the end to 
> end latency. The exceptions are caught by Kafka's stack and never bubble up 
> to us.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to