[ 
https://issues.apache.org/jira/browse/NIFI-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Handermann resolved NIFI-14727.
-------------------------------------
    Fix Version/s: 2.7.0
       Resolution: Fixed

> Add new ConsumeKinesis Processor
> --------------------------------
>
>                 Key: NIFI-14727
>                 URL: https://issues.apache.org/jira/browse/NIFI-14727
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Alaksiej Ščarbaty
>            Priority: Major
>             Fix For: 2.7.0
>
>         Attachments: image-2025-08-19-15-51-52-944.png, 
> image-2025-08-19-15-52-35-654.png
>
>          Time Spent: 18h 10m
>  Remaining Estimate: 0h
>
> h2. Goals
>  * Create a new version of the processor which doesn't rely on 
> ProcessSessionFactory and provides at least once delivery guarantees.
>  * Use Kinesis Client Library v3.
> h2. Non-Goals
>  * Usage of thin AWS SDK library and manual consumer and offset tracking. 
> _Error prone and much more complex._
> h1. Terminology
> [Kinesis 
> terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
> h1. Proposed solution
> h2. Involved Components
> *Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS 
> which is responsible for coordinating consumers, retrieving records and 
> checkpointing progress. Interaction happens via callbacks in RecordProcessor.
> *ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor 
> which is passed into KCL.
> *ShardBuffer* - an object which buffers consumed records for a single shard.
> *RecordBuffer* - an object that holds all shard buffers, manages the buffers 
> lifecycle, ensures exclusive read access to shard buffers, keeps track of 
> consumed memory.
> h2. Hooks from KCL
> Any event coming from *KCL* is pushed into 
> {*}ConsumeKinesisRecordProcessor{*}, which acts as an adapter between *KCL* 
> and {*}RecordBuffer{*}.
> *RecordBuffer* performs any necessary checks and routes the event to a 
> designated {*}ShardBuffer{*}.
> *ShardBuffer* reacts accordingly, then returns the results to 
> {*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
> Important note on the initialization. Technically, it’s possible *KCL* 
> removes a shard from a consumer, and then immediately assigns it back. In 
> this case it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be 
> dropped and the buffer must be recreated. Each *ShardBuffer* has an assigned 
> unique id, which makes it explicitly identifiable, even when the shard id is 
> the same.
> !image-2025-08-19-15-51-52-944.png!
> h2. Hooks from ConsumeKinesis processor
> Conceptually, the data consumption cycle consists of 3 steps. Acquiring a 
> lease on a {*}ShardBuffer{*}; consuming the records from the 
> {*}ShardBuffer{*}; returning the lease back, committing the progress on 
> success.
> Since leasing is involved, all interactions must happen via *RecordBuffer,* 
> which ensures the lease was not invalidated and routes the requests to a 
> particular {*}ShardBuffer{*}.
> h3. ShardBuffer structure
> *ShardBuffer* is a simple buffer which contains 2 queues with records. Just 
> received records are stored in a _PENDING_ queue. When a processor wants to 
> consume the buffer, all records in the _PENDING_ queue are moved to the 
> _IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
> In case of success, we’re sure all records have been processed, so the 
> progress is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
> In case of failure, it’s unclear whether the records were processed, so 
> everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the 
> processing can be retried. 
> !image-2025-08-19-15-52-35-654.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to