[
https://issues.apache.org/jira/browse/NIFI-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Handermann resolved NIFI-14727.
-------------------------------------
Fix Version/s: 2.7.0
Resolution: Fixed
> Add new ConsumeKinesis Processor
> --------------------------------
>
> Key: NIFI-14727
> URL: https://issues.apache.org/jira/browse/NIFI-14727
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Alaksiej Ščarbaty
> Priority: Major
> Fix For: 2.7.0
>
> Attachments: image-2025-08-19-15-51-52-944.png,
> image-2025-08-19-15-52-35-654.png
>
> Time Spent: 18h 10m
> Remaining Estimate: 0h
>
> h2. Goals
> * Create a new version of the processor which doesn't rely on
> ProcessSessionFactory and provides at least once delivery guarantees.
> * Use Kinesis Client Library v3.
> h2. Non-Goals
> * Usage of thin AWS SDK library and manual consumer and offset tracking.
> _Error prone and much more complex._
> h1. Terminology
> [Kinesis
> terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
> h1. Proposed solution
> h2. Involved Components
> *Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS
> which is responsible for coordinating consumers, retrieving records and
> checkpointing progress. Interaction happens via callbacks in RecordProcessor.
> *ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor
> which is passed into KCL.
> *ShardBuffer* - an object which buffers consumed records for a single shard.
> *RecordBuffer* - an object that holds all shard buffers, manages the buffers
> lifecycle, ensures exclusive read access to shard buffers, keeps track of
> consumed memory.
> h2. Hooks from KCL
> Any event coming from *KCL* is pushed into
> {*}ConsumeKinesisRecordProcessor{*}, which acts as an adapter between *KCL*
> and {*}RecordBuffer{*}.
> *RecordBuffer* performs any necessary checks and routes the event to a
> designated {*}ShardBuffer{*}.
> *ShardBuffer* reacts accordingly, then returns the results to
> {*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
> Important note on the initialization. Technically, it’s possible *KCL*
> removes a shard from a consumer, and then immediately assigns it back. In
> this case it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be
> dropped and the buffer must be recreated. Each *ShardBuffer* has an assigned
> unique id, which makes it explicitly identifiable, even when the shard id is
> the same.
> !image-2025-08-19-15-51-52-944.png!
> h2. Hooks from ConsumeKinesis processor
> Conceptually, the data consumption cycle consists of 3 steps. Acquiring a
> lease on a {*}ShardBuffer{*}; consuming the records from the
> {*}ShardBuffer{*}; returning the lease back, committing the progress on
> success.
> Since leasing is involved, all interactions must happen via *RecordBuffer,*
> which ensures the lease was not invalidated and routes the requests to a
> particular {*}ShardBuffer{*}.
> h3. ShardBuffer structure
> *ShardBuffer* is a simple buffer which contains 2 queues with records. Just
> received records are stored in a _PENDING_ queue. When a processor wants to
> consume the buffer, all records in the _PENDING_ queue are moved to the
> _IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
> In case of success, we’re sure all records have been processed, so the
> progress is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
> In case of failure, it’s unclear whether the records were processed, so
> everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the
> processing can be retried.
> !image-2025-08-19-15-52-35-654.png!
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)