[ 
https://issues.apache.org/jira/browse/NIFI-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alaksiej Ščarbaty updated NIFI-14727:
-------------------------------------
    Description: 
h2. Goals
 * Create a new version of the processor which doesn't rely on 
ProcessSessionFactory and provides at least once delivery guarantees.
 * Use Kinesis Client Library v3.

h2. Non-Goals
 * Usage of thin AWS SDK library and manual consumer and offset tracking. 
_Error prone and much more complex._

h1. Terminology

[Kinesis 
terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
h1. Proposed solution
h2. Involved Components

*Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS 
which is responsible for coordinating consumers, retrieving records and 
checkpointing progress. Interaction happens via callbacks in RecordProcessor.

*ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor which 
is passed into KCL.

*ShardBuffer* - an object which buffers consumed records for a single shard.

*RecordBuffer* - an object that holds all shard buffers, manages the buffers 
lifecycle, ensures exclusive read access to shard buffers, keeps track of 
consumed memory.
h2. Hooks from KCL

Any event coming from *KCL* is pushed into {*}ConsumeKinesisRecordProcessor{*}, 
which acts as an adapter between *KCL* and {*}RecordBuffer{*}.

*RecordBuffer* performs any necessary checks and routes the event to a 
designated {*}ShardBuffer{*}.

*ShardBuffer* reacts accordingly, then returns the results to 
{*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._

Important note on the initialization. Technically, it’s possible *KCL* removes 
a shard from a consumer, and then immediately assigns it back. In this case 
it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be dropped and 
the buffer must be recreated. Each *ShardBuffer* has an assigned unique id, 
which makes it explicitly identifiable, even when the shard id is the same.
h2. Hooks from ConsumeKinesis processor

Conceptually, the data consumption cycle consists of 3 steps. Acquiring a lease 
on a {*}ShardBuffer{*}; consuming the records from the {*}ShardBuffer{*}; 
returning the lease back, committing the progress on success.

Since leasing is involved, all interactions must happen via *RecordBuffer,* 
which ensures the lease was not invalidated and routes the requests to a 
particular {*}ShardBuffer{*}.
h3. ShardBuffer structure

*ShardBuffer* is a simple buffer which contains 2 queues with records. Just 
received records are stored in a _PENDING_ queue. When a processor wants to 
consume the buffer, all records in the _PENDING_ queue are moved to the 
_IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.

In case of success, we’re sure all records have been processed, so the progress 
is checkpointed and all data from _IN_PROGRESS_ queue is dropped.

In case of failure, it’s unclear whether the records were processed, so 
everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the 
processing can be retried. 

 

  was:
Create a new version of the processor, which doesn't rely on 
ProcessSessionFactory.

The new processor buffers all consumed records, which are written to flow files 
in onTrigger method.

The new processor uses KCL V3.


> Remove reference to ProcessSessionFactory from ConsumeKinesisStream processor
> -----------------------------------------------------------------------------
>
>                 Key: NIFI-14727
>                 URL: https://issues.apache.org/jira/browse/NIFI-14727
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Alaksiej Ščarbaty
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> h2. Goals
>  * Create a new version of the processor which doesn't rely on 
> ProcessSessionFactory and provides at least once delivery guarantees.
>  * Use Kinesis Client Library v3.
> h2. Non-Goals
>  * Usage of thin AWS SDK library and manual consumer and offset tracking. 
> _Error prone and much more complex._
> h1. Terminology
> [Kinesis 
> terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
> h1. Proposed solution
> h2. Involved Components
> *Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS 
> which is responsible for coordinating consumers, retrieving records and 
> checkpointing progress. Interaction happens via callbacks in RecordProcessor.
> *ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor 
> which is passed into KCL.
> *ShardBuffer* - an object which buffers consumed records for a single shard.
> *RecordBuffer* - an object that holds all shard buffers, manages the buffers 
> lifecycle, ensures exclusive read access to shard buffers, keeps track of 
> consumed memory.
> h2. Hooks from KCL
> Any event coming from *KCL* is pushed into 
> {*}ConsumeKinesisRecordProcessor{*}, which acts as an adapter between *KCL* 
> and {*}RecordBuffer{*}.
> *RecordBuffer* performs any necessary checks and routes the event to a 
> designated {*}ShardBuffer{*}.
> *ShardBuffer* reacts accordingly, then returns the results to 
> {*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
> Important note on the initialization. Technically, it’s possible *KCL* 
> removes a shard from a consumer, and then immediately assigns it back. In 
> this case it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be 
> dropped and the buffer must be recreated. Each *ShardBuffer* has an assigned 
> unique id, which makes it explicitly identifiable, even when the shard id is 
> the same.
> h2. Hooks from ConsumeKinesis processor
> Conceptually, the data consumption cycle consists of 3 steps. Acquiring a 
> lease on a {*}ShardBuffer{*}; consuming the records from the 
> {*}ShardBuffer{*}; returning the lease back, committing the progress on 
> success.
> Since leasing is involved, all interactions must happen via *RecordBuffer,* 
> which ensures the lease was not invalidated and routes the requests to a 
> particular {*}ShardBuffer{*}.
> h3. ShardBuffer structure
> *ShardBuffer* is a simple buffer which contains 2 queues with records. Just 
> received records are stored in a _PENDING_ queue. When a processor wants to 
> consume the buffer, all records in the _PENDING_ queue are moved to the 
> _IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
> In case of success, we’re sure all records have been processed, so the 
> progress is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
> In case of failure, it’s unclear whether the records were processed, so 
> everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the 
> processing can be retried. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to