Bozhidar Bozhanov created KAFKA-10474:
-----------------------------------------
Summary: Kafka Java client introduces CPU overhead when there are
many consumers
Key: KAFKA-10474
URL: https://issues.apache.org/jira/browse/KAFKA-10474
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.4.1
Reporter: Bozhidar Bozhanov
We are using the Kafka Java client (version 2.4.1) and we started noticing a
gradual increase in the CPU usage of our production instances. The usage was
not correlated with load, because it increases in bumps whenever we acquire a
new customer. We realized it's probably the number of consumers that is growing
steadily, as each registration adds one consumer per account by default.
We did some profiling and by running a local sample with 3000 consumers, we
noticed that the JmxReporter generates some CPU overhead. Unfortunately,
there's no way to disable it, so we created a class with the same name and
package on the classpath and let the classloader pick it up first. That alone
lead to a drop from 60% sustained CPU usage to 40% sustained.
We then optimized the creation of consumers only for active customers (so no
more idle consumers). When the idle consumers were removed, the CPU dropped to
8% sustained usage.
Apart from the JMX metrics reporter, I don't know what contributes to idle
consumers generating such CPU overhead. If it helps, our consumer does this:
{code:java}
try (KafkaConsumer<Integer, PendingAuditLogEntry> consumer = new
KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(applicationId.toString()));
while (true) {
try {
synchronized (applicationId.toString().intern()) {
ConsumerRecords<Integer, Message> messages =
consumer.poll(Duration.ofMillis(2000));
List<Message> entries = StreamSupport.stream(messages.spliterator(), false)
.map(ConsumerRecord::value).collect(Collectors.toList());
// idle consumers don't get here, because they have no entries to
process
if (!entries.isEmpty()) {
try {
// logic
consumer.commitSync();
} catch (ExecutionException ex) {
// revert to the last
successful batch start offset
// poll() advances its internal
position, which is different from the committed offset
consumer.committed(consumer.assignment()).forEach(consumer::seek);
}
}
}
// we want to process more records at once (due to
business logic specifics) so we sleep for a preconfigured period of time, in
our case - 10 seconds
Thread.sleep(kafkaSleepMillis);
} catch (Exception ex) {
logger.error("Exception while trying to get data from
kafka consumer for appId={}, stopping.", applicationId, ex);
break;
}
}
} catch (Exception ex) {
logger.error("Failed to run kafka consumer", ex);
}{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)