[ 
https://issues.apache.org/jira/browse/NIFI-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954976#comment-17954976
 ] 

ASF subversion and git services commented on NIFI-14598:
--------------------------------------------------------

Commit eaca86ab6fb4b8eb96c941e5e45571720973252b in nifi's branch 
refs/heads/main from Mark Payne
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=eaca86ab6f ]

NIFI-14598: Do not make Kafka consumer available to other threads until done 
using it, by ensuring that it is not added back to the pool until session 
commit completes (successfully or not). And do not make more than Max 
Concurrent Tasks active consumers.

NIFI-14598: If ConsumeKafka encounters an Exception while consuming, ensure 
that we rollback ProcessSession so that already-created FlowFiles are not 
transferred before we revert the offsets. Also ensure that we cannot decrement 
the Active Consuemr counter more than once for the same consumer

NIFI-14598: Fixed bug in which ConsumeKafka duplicated the last record upon 
restart

This closes #9971.

Signed-off-by: Peter Turcsanyi <[email protected]>


> ConsumeKafka makes KafkaConsumer object reusable before it should
> -----------------------------------------------------------------
>
>                 Key: NIFI-14598
>                 URL: https://issues.apache.org/jira/browse/NIFI-14598
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> ConsumeKafka maintains a BlockingQueue<KafkaConsumerService> in order to 
> provide pooling of consumers. After it pulls some data and transfers it to 
> the 'success' relationship, the Processor then calls 
> {{ProcessSession.commitAsync(Runnable, Consumer<Throwable>)}} such that on 
> success, it commits the offsets and on failure, it rolls back. It then, in a 
> {{finally}} block adds the KafkaConsumer back to the {{{}BlockingQueue{}}}. 
> This can cause problems because the consumer is not thread-safe, but it is 
> now made available for another thread to pull from the queue while at the 
> same time exposing the ConsumerService to a separate thread by means of 
> referencing it in the commitAsync callbacks.
> We need to ensure that we do not re-queue the consumer until after the 
> session commit completes (successfully or otherwise). Additionally, the 
> onTrigger method may get called again before the async commit completes, 
> which would result in creating a new Consumer. To prevent this, we need to 
> keep track of how many active consumers there are and never go above the max 
> number of concurrent tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to