[ 
https://issues.apache.org/jira/browse/NIFI-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Handermann resolved NIFI-14358.
-------------------------------------
    Fix Version/s: 2.4.0
       Resolution: Fixed

> Add Blocking on Backpressure to ConsumeKinesisStream
> ----------------------------------------------------
>
>                 Key: NIFI-14358
>                 URL: https://issues.apache.org/jira/browse/NIFI-14358
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Dariusz Seweryn
>            Assignee: Dariusz Seweryn
>            Priority: Major
>             Fix For: 2.4.0
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> ConsumeKinesisStream (CKS) does not honor backpressure. Individual 
> ShardProcessors create FlowFiles as soon as a batch of records is available 
> and processed. This may pose a problem for systems that are not quick enough 
> with processing.
> NiFi expects that processors would create data only when onTrigger function 
> is called. This is problematic with CKS due to usage of KinesisClientLibrary 
> (KCL) which is opinionated in way it manages (creates) threads that produce 
> data. 
> Standard approach for backpressure is to have a blocking queue that would be 
> drained in onTrigger. This is not ideal because:
>  * This blocking queue would have no notion of the size of the outgoing 
> connection (FlowFiles #, size) so additional properties would be needed.
>  * KCL creates a thread per each shard it subscribes to — this would create 
> contention on the blocking queue on the happy-path scenario if a single queue 
> was introduced. If a blocking queue per shard processor was introduced it 
> could be cumbersome to track all those queues to drain in onTrigger.
>  * Additionally when data from different shards should be processed 
> separately for a single queue additional complexity would be needed for 
> separation of records.
> Alternative approach would be to allow all shard processors to work for a 
> time period after the last onTrigger call. This approach would allow:
>  * Shard processors to wait when onTrigger would not be called regularly.
>  * No need for blocking queues, only simple locking mechanism with control 
> kept in CKS.
>  * Little changes to the current design.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to