[
https://issues.apache.org/jira/browse/NIFI-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Handermann resolved NIFI-14358.
-------------------------------------
Fix Version/s: 2.4.0
Resolution: Fixed
> Add Blocking on Backpressure to ConsumeKinesisStream
> ----------------------------------------------------
>
> Key: NIFI-14358
> URL: https://issues.apache.org/jira/browse/NIFI-14358
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Dariusz Seweryn
> Assignee: Dariusz Seweryn
> Priority: Major
> Fix For: 2.4.0
>
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> ConsumeKinesisStream (CKS) does not honor backpressure. Individual
> ShardProcessors create FlowFiles as soon as a batch of records is available
> and processed. This may pose a problem for systems that are not quick enough
> with processing.
> NiFi expects that processors would create data only when onTrigger function
> is called. This is problematic with CKS due to usage of KinesisClientLibrary
> (KCL) which is opinionated in way it manages (creates) threads that produce
> data.
> Standard approach for backpressure is to have a blocking queue that would be
> drained in onTrigger. This is not ideal because:
> * This blocking queue would have no notion of the size of the outgoing
> connection (FlowFiles #, size) so additional properties would be needed.
> * KCL creates a thread per each shard it subscribes to — this would create
> contention on the blocking queue on the happy-path scenario if a single queue
> was introduced. If a blocking queue per shard processor was introduced it
> could be cumbersome to track all those queues to drain in onTrigger.
> * Additionally when data from different shards should be processed
> separately for a single queue additional complexity would be needed for
> separation of records.
> Alternative approach would be to allow all shard processors to work for a
> time period after the last onTrigger call. This approach would allow:
> * Shard processors to wait when onTrigger would not be called regularly.
> * No need for blocking queues, only simple locking mechanism with control
> kept in CKS.
> * Little changes to the current design.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)