[
https://issues.apache.org/jira/browse/NIFI-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954976#comment-17954976
]
ASF subversion and git services commented on NIFI-14598:
--------------------------------------------------------
Commit eaca86ab6fb4b8eb96c941e5e45571720973252b in nifi's branch
refs/heads/main from Mark Payne
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=eaca86ab6f ]
NIFI-14598: Do not make Kafka consumer available to other threads until done
using it, by ensuring that it is not added back to the pool until session
commit completes (successfully or not). And do not make more than Max
Concurrent Tasks active consumers.
NIFI-14598: If ConsumeKafka encounters an Exception while consuming, ensure
that we rollback ProcessSession so that already-created FlowFiles are not
transferred before we revert the offsets. Also ensure that we cannot decrement
the Active Consuemr counter more than once for the same consumer
NIFI-14598: Fixed bug in which ConsumeKafka duplicated the last record upon
restart
This closes #9971.
Signed-off-by: Peter Turcsanyi <[email protected]>
> ConsumeKafka makes KafkaConsumer object reusable before it should
> -----------------------------------------------------------------
>
> Key: NIFI-14598
> URL: https://issues.apache.org/jira/browse/NIFI-14598
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Reporter: Mark Payne
> Assignee: Mark Payne
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> ConsumeKafka maintains a BlockingQueue<KafkaConsumerService> in order to
> provide pooling of consumers. After it pulls some data and transfers it to
> the 'success' relationship, the Processor then calls
> {{ProcessSession.commitAsync(Runnable, Consumer<Throwable>)}} such that on
> success, it commits the offsets and on failure, it rolls back. It then, in a
> {{finally}} block adds the KafkaConsumer back to the {{{}BlockingQueue{}}}.
> This can cause problems because the consumer is not thread-safe, but it is
> now made available for another thread to pull from the queue while at the
> same time exposing the ConsumerService to a separate thread by means of
> referencing it in the commitAsync callbacks.
> We need to ensure that we do not re-queue the consumer until after the
> session commit completes (successfully or otherwise). Additionally, the
> onTrigger method may get called again before the async commit completes,
> which would result in creating a new Consumer. To prevent this, we need to
> keep track of how many active consumers there are and never go above the max
> number of concurrent tasks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)