Mark Payne created NIFI-14656:
---------------------------------
Summary: ConsumeKafka does not properly reuse Kafka Consumer if
configured not to commit offsets
Key: NIFI-14656
URL: https://issues.apache.org/jira/browse/NIFI-14656
Project: Apache NiFi
Issue Type: Bug
Components: Core Framework
Reporter: Mark Payne
Assignee: Mark Payne
The {{commitOffsets}} method currently looks like this:
{code:java}
private void commitOffsets(final KafkaConsumerService consumerService, final
OffsetTracker offsetTracker, final PollingContext pollingContext) {
if (!commitOffsets) {
return;
}
try {
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
consumerServices.offer(consumerService);
getLogger().debug("Committed offsets for Kafka Consumer Service");
} catch (final Exception e) {
close(consumerService);
getLogger().error("Failed to commit offsets for Kafka Consumer
Service", e);
}
}
{code}
The first thing it does is check if commitOffsets is false and if so returns.
This means that it does not return the ConsumerService back to the
consumerServices queue, so it can never be reused. While we want to avoid
calling {{consumerService.commit}} in this case, we must still call
{{consuemrServices.offer}} to reuse the consumer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)