Bozhidar Bozhanov created KAFKA-10474:
-----------------------------------------

             Summary: Kafka Java client introduces CPU overhead when there are 
many consumers
                 Key: KAFKA-10474
                 URL: https://issues.apache.org/jira/browse/KAFKA-10474
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.4.1
            Reporter: Bozhidar Bozhanov


We are using the Kafka Java client (version 2.4.1) and we started noticing a 
gradual increase in the CPU usage of our production instances. The usage was 
not correlated with load, because it increases in bumps whenever we acquire a 
new customer. We realized it's probably the number of consumers that is growing 
steadily, as each registration adds one consumer per account by default.

 

We did some profiling and by running a local sample with 3000 consumers, we 
noticed that the JmxReporter generates some CPU overhead. Unfortunately, 
there's no way to disable it, so we created a class with the same name and 
package on the classpath and let the classloader pick it up first. That alone 
lead to a drop from 60% sustained CPU usage to 40% sustained.

 

We then optimized the creation of consumers only for active customers (so no 
more idle consumers). When the idle consumers were removed, the CPU dropped to 
8% sustained usage.

Apart from the JMX metrics reporter, I don't know what contributes to idle 
consumers generating such CPU overhead. If it helps, our consumer does this:

 

 
{code:java}
try (KafkaConsumer<Integer, PendingAuditLogEntry> consumer = new 
KafkaConsumer<>(props)) {
        
consumer.subscribe(Collections.singletonList(applicationId.toString()));        
while (true) {
                try {
                        synchronized (applicationId.toString().intern()) {
                                
                                ConsumerRecords<Integer, Message> messages = 
consumer.poll(Duration.ofMillis(2000));                            
List<Message> entries = StreamSupport.stream(messages.spliterator(), false)
                                                
.map(ConsumerRecord::value).collect(Collectors.toList());                       
        // idle consumers don't get here, because they have no entries to 
process
                                if (!entries.isEmpty()) {
                                        try {
                                                // logic
                                                consumer.commitSync();
                                        } catch (ExecutionException ex) {
                                                // revert to the last 
successful batch start offset
                                                // poll() advances its internal 
position, which is different from the committed offset
                                                
consumer.committed(consumer.assignment()).forEach(consumer::seek);
                                        }
                                }
                        }
                        // we want to process more records at once (due to 
business logic specifics) so we sleep for a preconfigured period of time, in 
our case - 10 seconds
                        Thread.sleep(kafkaSleepMillis);
                } catch (Exception ex) {
                        logger.error("Exception while trying to get data from 
kafka consumer for appId={}, stopping.", applicationId, ex);
                        break;
                }
        }
} catch (Exception ex) {
        logger.error("Failed to run kafka consumer", ex);
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to