Mark Payne created NIFI-14598:
---------------------------------
Summary: ConsumeKafka makes KafkaConsumer object reusable before
it should
Key: NIFI-14598
URL: https://issues.apache.org/jira/browse/NIFI-14598
Project: Apache NiFi
Issue Type: Bug
Components: Extensions
Reporter: Mark Payne
Assignee: Mark Payne
ConsumeKafka maintains a BlockingQueue<KafkaConsumerService> in order to
provide pooling of consumers. After it pulls some data and transfers it to the
'success' relationship, the Processor then calls
{{ProcessSession.commitAsync(Runnable, Consumer<Throwable>)}} such that on
success, it commits the offsets and on failure, it rolls back. It then, in a
{{finally}} block adds the KafkaConsumer back to the {{{}BlockingQueue{}}}.
This can cause problems because the consumer is not thread-safe, but it is now
made available for another thread to pull from the queue while at the same time
exposing the ConsumerService to a separate thread by means of referencing it in
the commitAsync callbacks.
We need to ensure that we do not re-queue the consumer until after the session
commit completes (successfully or otherwise). Additionally, the onTrigger
method may get called again before the async commit completes, which would
result in creating a new Consumer. To prevent this, we need to keep track of
how many active consumers there are and never go above the max number of
concurrent tasks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)