[
https://issues.apache.org/jira/browse/NIFI-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alaksiej Ščarbaty updated NIFI-14727:
-------------------------------------
Description:
h2. Goals
* Create a new version of the processor which doesn't rely on
ProcessSessionFactory and provides at least once delivery guarantees.
* Use Kinesis Client Library v3.
h2. Non-Goals
* Usage of thin AWS SDK library and manual consumer and offset tracking.
_Error prone and much more complex._
h1. Terminology
[Kinesis
terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
h1. Proposed solution
h2. Involved Components
*Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS
which is responsible for coordinating consumers, retrieving records and
checkpointing progress. Interaction happens via callbacks in RecordProcessor.
*ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor which
is passed into KCL.
*ShardBuffer* - an object which buffers consumed records for a single shard.
*RecordBuffer* - an object that holds all shard buffers, manages the buffers
lifecycle, ensures exclusive read access to shard buffers, keeps track of
consumed memory.
h2. Hooks from KCL
Any event coming from *KCL* is pushed into {*}ConsumeKinesisRecordProcessor{*},
which acts as an adapter between *KCL* and {*}RecordBuffer{*}.
*RecordBuffer* performs any necessary checks and routes the event to a
designated {*}ShardBuffer{*}.
*ShardBuffer* reacts accordingly, then returns the results to
{*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
Important note on the initialization. Technically, it’s possible *KCL* removes
a shard from a consumer, and then immediately assigns it back. In this case
it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be dropped and
the buffer must be recreated. Each *ShardBuffer* has an assigned unique id,
which makes it explicitly identifiable, even when the shard id is the same.
h2. Hooks from ConsumeKinesis processor
Conceptually, the data consumption cycle consists of 3 steps. Acquiring a lease
on a {*}ShardBuffer{*}; consuming the records from the {*}ShardBuffer{*};
returning the lease back, committing the progress on success.
Since leasing is involved, all interactions must happen via *RecordBuffer,*
which ensures the lease was not invalidated and routes the requests to a
particular {*}ShardBuffer{*}.
h3. ShardBuffer structure
*ShardBuffer* is a simple buffer which contains 2 queues with records. Just
received records are stored in a _PENDING_ queue. When a processor wants to
consume the buffer, all records in the _PENDING_ queue are moved to the
_IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
In case of success, we’re sure all records have been processed, so the progress
is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
In case of failure, it’s unclear whether the records were processed, so
everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the
processing can be retried.
was:
Create a new version of the processor, which doesn't rely on
ProcessSessionFactory.
The new processor buffers all consumed records, which are written to flow files
in onTrigger method.
The new processor uses KCL V3.
> Remove reference to ProcessSessionFactory from ConsumeKinesisStream processor
> -----------------------------------------------------------------------------
>
> Key: NIFI-14727
> URL: https://issues.apache.org/jira/browse/NIFI-14727
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Alaksiej Ščarbaty
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> h2. Goals
> * Create a new version of the processor which doesn't rely on
> ProcessSessionFactory and provides at least once delivery guarantees.
> * Use Kinesis Client Library v3.
> h2. Non-Goals
> * Usage of thin AWS SDK library and manual consumer and offset tracking.
> _Error prone and much more complex._
> h1. Terminology
> [Kinesis
> terminology|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html].
> h1. Proposed solution
> h2. Involved Components
> *Kinesis Client Library (KCL) v3 -* a thick Kinesis client maintained by AWS
> which is responsible for coordinating consumers, retrieving records and
> checkpointing progress. Interaction happens via callbacks in RecordProcessor.
> *ConsumeKinesisRecordProcessor* - our implementation of a RecordProcessor
> which is passed into KCL.
> *ShardBuffer* - an object which buffers consumed records for a single shard.
> *RecordBuffer* - an object that holds all shard buffers, manages the buffers
> lifecycle, ensures exclusive read access to shard buffers, keeps track of
> consumed memory.
> h2. Hooks from KCL
> Any event coming from *KCL* is pushed into
> {*}ConsumeKinesisRecordProcessor{*}, which acts as an adapter between *KCL*
> and {*}RecordBuffer{*}.
> *RecordBuffer* performs any necessary checks and routes the event to a
> designated {*}ShardBuffer{*}.
> *ShardBuffer* reacts accordingly, then returns the results to
> {*}RecordBuffer{*}, if appropriate. _More on the ShardBuffer structure later._
> Important note on the initialization. Technically, it’s possible *KCL*
> removes a shard from a consumer, and then immediately assigns it back. In
> this case it’s unsafe to keep the old {*}ShardBuffer{*}, so the data must be
> dropped and the buffer must be recreated. Each *ShardBuffer* has an assigned
> unique id, which makes it explicitly identifiable, even when the shard id is
> the same.
> h2. Hooks from ConsumeKinesis processor
> Conceptually, the data consumption cycle consists of 3 steps. Acquiring a
> lease on a {*}ShardBuffer{*}; consuming the records from the
> {*}ShardBuffer{*}; returning the lease back, committing the progress on
> success.
> Since leasing is involved, all interactions must happen via *RecordBuffer,*
> which ensures the lease was not invalidated and routes the requests to a
> particular {*}ShardBuffer{*}.
> h3. ShardBuffer structure
> *ShardBuffer* is a simple buffer which contains 2 queues with records. Just
> received records are stored in a _PENDING_ queue. When a processor wants to
> consume the buffer, all records in the _PENDING_ queue are moved to the
> _IN_PROGRESS_ queue. And all _IN_PROGRESS_ records are returned.
> In case of success, we’re sure all records have been processed, so the
> progress is checkpointed and all data from _IN_PROGRESS_ queue is dropped.
> In case of failure, it’s unclear whether the records were processed, so
> everything is kept as is. Records are kept in the _IN_PROGRESS_ queue, so the
> processing can be retried.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)