Mark Payne created NIFI-14598:
---------------------------------

             Summary: ConsumeKafka makes KafkaConsumer object reusable before 
it should
                 Key: NIFI-14598
                 URL: https://issues.apache.org/jira/browse/NIFI-14598
             Project: Apache NiFi
          Issue Type: Bug
          Components: Extensions
            Reporter: Mark Payne
            Assignee: Mark Payne


ConsumeKafka maintains a BlockingQueue<KafkaConsumerService> in order to 
provide pooling of consumers. After it pulls some data and transfers it to the 
'success' relationship, the Processor then calls 
{{ProcessSession.commitAsync(Runnable, Consumer<Throwable>)}} such that on 
success, it commits the offsets and on failure, it rolls back. It then, in a 
{{finally}} block adds the KafkaConsumer back to the {{{}BlockingQueue{}}}. 
This can cause problems because the consumer is not thread-safe, but it is now 
made available for another thread to pull from the queue while at the same time 
exposing the ConsumerService to a separate thread by means of referencing it in 
the commitAsync callbacks.

We need to ensure that we do not re-queue the consumer until after the session 
commit completes (successfully or otherwise). Additionally, the onTrigger 
method may get called again before the async commit completes, which would 
result in creating a new Consumer. To prevent this, we need to keep track of 
how many active consumers there are and never go above the max number of 
concurrent tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to