[ 
https://issues.apache.org/jira/browse/NIFI-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Turcsanyi resolved NIFI-14598.
------------------------------------
    Fix Version/s: 2.5.0
       Resolution: Fixed

> ConsumeKafka makes KafkaConsumer object reusable before it should
> -----------------------------------------------------------------
>
>                 Key: NIFI-14598
>                 URL: https://issues.apache.org/jira/browse/NIFI-14598
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> ConsumeKafka maintains a BlockingQueue<KafkaConsumerService> in order to 
> provide pooling of consumers. After it pulls some data and transfers it to 
> the 'success' relationship, the Processor then calls 
> {{ProcessSession.commitAsync(Runnable, Consumer<Throwable>)}} such that on 
> success, it commits the offsets and on failure, it rolls back. It then, in a 
> {{finally}} block adds the KafkaConsumer back to the {{{}BlockingQueue{}}}. 
> This can cause problems because the consumer is not thread-safe, but it is 
> now made available for another thread to pull from the queue while at the 
> same time exposing the ConsumerService to a separate thread by means of 
> referencing it in the commitAsync callbacks.
> We need to ensure that we do not re-queue the consumer until after the 
> session commit completes (successfully or otherwise). Additionally, the 
> onTrigger method may get called again before the async commit completes, 
> which would result in creating a new Consumer. To prevent this, we need to 
> keep track of how many active consumers there are and never go above the max 
> number of concurrent tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to