[ 
https://issues.apache.org/jira/browse/NIFI-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alaksiej Ščarbaty updated NIFI-14727:
-------------------------------------
    Description: 
h2. Goals
 * Create a new version of the processor which doesn't rely on 
ProcessSessionFactory and provides at least once delivery guarantees.
 * Use Kinesis Client Library v3.

h2. Non-Goals
 * Usage of thin AWS SDK library and manual consumer and offset tracking. 
_Error prone and much more complex._

h1. Terminology

[Kinesis 
terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
h1. Proposed solution
h2. Involved Components

*Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS 
which is responsible for coordinating consumers, retrieving records and 
checkpointing progress. Interaction happens via callbacks in RecordProcessor.

*ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor which 
is passed into KCL.

*ShardBuffer* - an object which buffers consumed records for a single shard.

*RecordBuffer* - an object that holds all shard buffers, manages the buffers 
lifecycle, ensures exclusive read access to shard buffers, keeps track of 
consumed memory.
h2. Hooks from KCL

Any event coming from *KCL* is pushed into {*}ConsumeKinesisRecordProcessor{*}, 
which acts as an adapter between *KCL* and {*}RecordBuffer{*}.

*RecordBuffer* performs any necessary checks and routes the event to a 
designated {*}ShardBuffer{*}.

*ShardBuffer* reacts accordingly, then returns the results to 
{*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._

Important note on the initialization. Technically, it’s possible *KCL* removes 
a shard from a consumer, and then immediately assigns it back. In this case 
it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be dropped and 
the buffer must be recreated. Each *ShardBuffer* has an assigned unique id, 
which makes it explicitly identifiable, even when the shard id is the same.

!image-2025-08-19-15-51-52-944.png!
h2. Hooks from ConsumeKinesis processor

Conceptually, the data consumption cycle consists of 3 steps. Acquiring a lease 
on a {*}ShardBuffer{*}; consuming the records from the {*}ShardBuffer{*}; 
returning the lease back, committing the progress on success.

Since leasing is involved, all interactions must happen via *RecordBuffer,* 
which ensures the lease was not invalidated and routes the requests to a 
particular {*}ShardBuffer{*}.
h3. ShardBuffer structure

*ShardBuffer* is a simple buffer which contains 2 queues with records. Just 
received records are stored in a _PENDING_ queue. When a processor wants to 
consume the buffer, all records in the _PENDING_ queue are moved to the 
_IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.

In case of success, we’re sure all records have been processed, so the progress 
is checkpointed and all data from _IN_PROGRESS_ queue is dropped.

In case of failure, it’s unclear whether the records were processed, so 
everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the 
processing can be retried. 

!image-2025-08-19-15-52-35-654.png!

 

  was:
h2. Goals
 * Create a new version of the processor which doesn't rely on 
ProcessSessionFactory and provides at least once delivery guarantees.
 * Use Kinesis Client Library v3.

h2. Non-Goals
 * Usage of thin AWS SDK library and manual consumer and offset tracking. 
_Error prone and much more complex._

h1. Terminology

[Kinesis 
terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
h1. Proposed solution
h2. Involved Components

*Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS 
which is responsible for coordinating consumers, retrieving records and 
checkpointing progress. Interaction happens via callbacks in RecordProcessor.

*ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor which 
is passed into KCL.

*ShardBuffer* - an object which buffers consumed records for a single shard.

*RecordBuffer* - an object that holds all shard buffers, manages the buffers 
lifecycle, ensures exclusive read access to shard buffers, keeps track of 
consumed memory.
h2. Hooks from KCL

Any event coming from *KCL* is pushed into {*}ConsumeKinesisRecordProcessor{*}, 
which acts as an adapter between *KCL* and {*}RecordBuffer{*}.

*RecordBuffer* performs any necessary checks and routes the event to a 
designated {*}ShardBuffer{*}.

*ShardBuffer* reacts accordingly, then returns the results to 
{*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._

Important note on the initialization. Technically, it’s possible *KCL* removes 
a shard from a consumer, and then immediately assigns it back. In this case 
it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be dropped and 
the buffer must be recreated. Each *ShardBuffer* has an assigned unique id, 
which makes it explicitly identifiable, even when the shard id is the same.
h2. Hooks from ConsumeKinesis processor

Conceptually, the data consumption cycle consists of 3 steps. Acquiring a lease 
on a {*}ShardBuffer{*}; consuming the records from the {*}ShardBuffer{*}; 
returning the lease back, committing the progress on success.

Since leasing is involved, all interactions must happen via *RecordBuffer,* 
which ensures the lease was not invalidated and routes the requests to a 
particular {*}ShardBuffer{*}.
h3. ShardBuffer structure

*ShardBuffer* is a simple buffer which contains 2 queues with records. Just 
received records are stored in a _PENDING_ queue. When a processor wants to 
consume the buffer, all records in the _PENDING_ queue are moved to the 
_IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.

In case of success, we’re sure all records have been processed, so the progress 
is checkpointed and all data from _IN_PROGRESS_ queue is dropped.

In case of failure, it’s unclear whether the records were processed, so 
everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the 
processing can be retried. 

 


> Remove reference to ProcessSessionFactory from ConsumeKinesisStream processor
> -----------------------------------------------------------------------------
>
>                 Key: NIFI-14727
>                 URL: https://issues.apache.org/jira/browse/NIFI-14727
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Alaksiej Ščarbaty
>            Priority: Major
>         Attachments: image-2025-08-19-15-51-52-944.png, 
> image-2025-08-19-15-52-35-654.png
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> h2. Goals
>  * Create a new version of the processor which doesn't rely on 
> ProcessSessionFactory and provides at least once delivery guarantees.
>  * Use Kinesis Client Library v3.
> h2. Non-Goals
>  * Usage of thin AWS SDK library and manual consumer and offset tracking. 
> _Error prone and much more complex._
> h1. Terminology
> [Kinesis 
> terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
> h1. Proposed solution
> h2. Involved Components
> *Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS 
> which is responsible for coordinating consumers, retrieving records and 
> checkpointing progress. Interaction happens via callbacks in RecordProcessor.
> *ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor 
> which is passed into KCL.
> *ShardBuffer* - an object which buffers consumed records for a single shard.
> *RecordBuffer* - an object that holds all shard buffers, manages the buffers 
> lifecycle, ensures exclusive read access to shard buffers, keeps track of 
> consumed memory.
> h2. Hooks from KCL
> Any event coming from *KCL* is pushed into 
> {*}ConsumeKinesisRecordProcessor{*}, which acts as an adapter between *KCL* 
> and {*}RecordBuffer{*}.
> *RecordBuffer* performs any necessary checks and routes the event to a 
> designated {*}ShardBuffer{*}.
> *ShardBuffer* reacts accordingly, then returns the results to 
> {*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
> Important note on the initialization. Technically, it’s possible *KCL* 
> removes a shard from a consumer, and then immediately assigns it back. In 
> this case it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be 
> dropped and the buffer must be recreated. Each *ShardBuffer* has an assigned 
> unique id, which makes it explicitly identifiable, even when the shard id is 
> the same.
> !image-2025-08-19-15-51-52-944.png!
> h2. Hooks from ConsumeKinesis processor
> Conceptually, the data consumption cycle consists of 3 steps. Acquiring a 
> lease on a {*}ShardBuffer{*}; consuming the records from the 
> {*}ShardBuffer{*}; returning the lease back, committing the progress on 
> success.
> Since leasing is involved, all interactions must happen via *RecordBuffer,* 
> which ensures the lease was not invalidated and routes the requests to a 
> particular {*}ShardBuffer{*}.
> h3. ShardBuffer structure
> *ShardBuffer* is a simple buffer which contains 2 queues with records. Just 
> received records are stored in a _PENDING_ queue. When a processor wants to 
> consume the buffer, all records in the _PENDING_ queue are moved to the 
> _IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
> In case of success, we’re sure all records have been processed, so the 
> progress is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
> In case of failure, it’s unclear whether the records were processed, so 
> everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the 
> processing can be retried. 
> !image-2025-08-19-15-52-35-654.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to