[
https://issues.apache.org/jira/browse/NIFI-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alaksiej Ščarbaty updated NIFI-14727:
-------------------------------------
Description:
h2. Goals
* Create a new version of the processor which doesn't rely on
ProcessSessionFactory and provides at least once delivery guarantees.
* Use Kinesis Client Library v3.
h2. Non-Goals
* Usage of thin AWS SDK library and manual consumer and offset tracking.
_Error prone and much more complex._
h1. Terminology
[Kinesis
terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
h1. Proposed solution
h2. Involved Components
*Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS
which is responsible for coordinating consumers, retrieving records and
checkpointing progress. Interaction happens via callbacks in RecordProcessor.
*ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor which
is passed into KCL.
*ShardBuffer* - an object which buffers consumed records for a single shard.
*RecordBuffer* - an object that holds all shard buffers, manages the buffers
lifecycle, ensures exclusive read access to shard buffers, keeps track of
consumed memory.
h2. Hooks from KCL
Any event coming from *KCL* is pushed into {*}ConsumeKinesisRecordProcessor{*},
which acts as an adapter between *KCL* and {*}RecordBuffer{*}.
*RecordBuffer* performs any necessary checks and routes the event to a
designated {*}ShardBuffer{*}.
*ShardBuffer* reacts accordingly, then returns the results to
{*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
Important note on the initialization. Technically, it’s possible *KCL* removes
a shard from a consumer, and then immediately assigns it back. In this case
it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be dropped and
the buffer must be recreated. Each *ShardBuffer* has an assigned unique id,
which makes it explicitly identifiable, even when the shard id is the same.
!image-2025-08-19-15-51-52-944.png!
h2. Hooks from ConsumeKinesis processor
Conceptually, the data consumption cycle consists of 3 steps. Acquiring a lease
on a {*}ShardBuffer{*}; consuming the records from the {*}ShardBuffer{*};
returning the lease back, committing the progress on success.
Since leasing is involved, all interactions must happen via *RecordBuffer,*
which ensures the lease was not invalidated and routes the requests to a
particular {*}ShardBuffer{*}.
h3. ShardBuffer structure
*ShardBuffer* is a simple buffer which contains 2 queues with records. Just
received records are stored in a _PENDING_ queue. When a processor wants to
consume the buffer, all records in the _PENDING_ queue are moved to the
_IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
In case of success, we’re sure all records have been processed, so the progress
is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
In case of failure, it’s unclear whether the records were processed, so
everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the
processing can be retried.
!image-2025-08-19-15-52-35-654.png!
was:
h2. Goals
* Create a new version of the processor which doesn't rely on
ProcessSessionFactory and provides at least once delivery guarantees.
* Use Kinesis Client Library v3.
h2. Non-Goals
* Usage of thin AWS SDK library and manual consumer and offset tracking.
_Error prone and much more complex._
h1. Terminology
[Kinesis
terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
h1. Proposed solution
h2. Involved Components
*Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS
which is responsible for coordinating consumers, retrieving records and
checkpointing progress. Interaction happens via callbacks in RecordProcessor.
*ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor which
is passed into KCL.
*ShardBuffer* - an object which buffers consumed records for a single shard.
*RecordBuffer* - an object that holds all shard buffers, manages the buffers
lifecycle, ensures exclusive read access to shard buffers, keeps track of
consumed memory.
h2. Hooks from KCL
Any event coming from *KCL* is pushed into {*}ConsumeKinesisRecordProcessor{*},
which acts as an adapter between *KCL* and {*}RecordBuffer{*}.
*RecordBuffer* performs any necessary checks and routes the event to a
designated {*}ShardBuffer{*}.
*ShardBuffer* reacts accordingly, then returns the results to
{*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
Important note on the initialization. Technically, it’s possible *KCL* removes
a shard from a consumer, and then immediately assigns it back. In this case
it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be dropped and
the buffer must be recreated. Each *ShardBuffer* has an assigned unique id,
which makes it explicitly identifiable, even when the shard id is the same.
h2. Hooks from ConsumeKinesis processor
Conceptually, the data consumption cycle consists of 3 steps. Acquiring a lease
on a {*}ShardBuffer{*}; consuming the records from the {*}ShardBuffer{*};
returning the lease back, committing the progress on success.
Since leasing is involved, all interactions must happen via *RecordBuffer,*
which ensures the lease was not invalidated and routes the requests to a
particular {*}ShardBuffer{*}.
h3. ShardBuffer structure
*ShardBuffer* is a simple buffer which contains 2 queues with records. Just
received records are stored in a _PENDING_ queue. When a processor wants to
consume the buffer, all records in the _PENDING_ queue are moved to the
_IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
In case of success, we’re sure all records have been processed, so the progress
is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
In case of failure, it’s unclear whether the records were processed, so
everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the
processing can be retried.
> Remove reference to ProcessSessionFactory from ConsumeKinesisStream processor
> -----------------------------------------------------------------------------
>
> Key: NIFI-14727
> URL: https://issues.apache.org/jira/browse/NIFI-14727
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Alaksiej Ščarbaty
> Priority: Major
> Attachments: image-2025-08-19-15-51-52-944.png,
> image-2025-08-19-15-52-35-654.png
>
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> h2. Goals
> * Create a new version of the processor which doesn't rely on
> ProcessSessionFactory and provides at least once delivery guarantees.
> * Use Kinesis Client Library v3.
> h2. Non-Goals
> * Usage of thin AWS SDK library and manual consumer and offset tracking.
> _Error prone and much more complex._
> h1. Terminology
> [Kinesis
> terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
> h1. Proposed solution
> h2. Involved Components
> *Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS
> which is responsible for coordinating consumers, retrieving records and
> checkpointing progress. Interaction happens via callbacks in RecordProcessor.
> *ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor
> which is passed into KCL.
> *ShardBuffer* - an object which buffers consumed records for a single shard.
> *RecordBuffer* - an object that holds all shard buffers, manages the buffers
> lifecycle, ensures exclusive read access to shard buffers, keeps track of
> consumed memory.
> h2. Hooks from KCL
> Any event coming from *KCL* is pushed into
> {*}ConsumeKinesisRecordProcessor{*}, which acts as an adapter between *KCL*
> and {*}RecordBuffer{*}.
> *RecordBuffer* performs any necessary checks and routes the event to a
> designated {*}ShardBuffer{*}.
> *ShardBuffer* reacts accordingly, then returns the results to
> {*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
> Important note on the initialization. Technically, it’s possible *KCL*
> removes a shard from a consumer, and then immediately assigns it back. In
> this case it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be
> dropped and the buffer must be recreated. Each *ShardBuffer* has an assigned
> unique id, which makes it explicitly identifiable, even when the shard id is
> the same.
> !image-2025-08-19-15-51-52-944.png!
> h2. Hooks from ConsumeKinesis processor
> Conceptually, the data consumption cycle consists of 3 steps. Acquiring a
> lease on a {*}ShardBuffer{*}; consuming the records from the
> {*}ShardBuffer{*}; returning the lease back, committing the progress on
> success.
> Since leasing is involved, all interactions must happen via *RecordBuffer,*
> which ensures the lease was not invalidated and routes the requests to a
> particular {*}ShardBuffer{*}.
> h3. ShardBuffer structure
> *ShardBuffer* is a simple buffer which contains 2 queues with records. Just
> received records are stored in a _PENDING_ queue. When a processor wants to
> consume the buffer, all records in the _PENDING_ queue are moved to the
> _IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
> In case of success, we’re sure all records have been processed, so the
> progress is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
> In case of failure, it’s unclear whether the records were processed, so
> everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the
> processing can be retried.
> !image-2025-08-19-15-52-35-654.png!
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)