Dariusz Seweryn created NIFI-14358:
--------------------------------------

             Summary: ConsumeKinesisStream backpressure honoring
                 Key: NIFI-14358
                 URL: https://issues.apache.org/jira/browse/NIFI-14358
             Project: Apache NiFi
          Issue Type: Improvement
          Components: Extensions
    Affects Versions: 2.3.0
            Reporter: Dariusz Seweryn
            Assignee: Dariusz Seweryn


ConsumeKinesisStream (CKS) does not honor backpressure. Individual 
ShardProcessors create FlowFiles as soon as a batch of records is available and 
processed. This may pose a problem for systems that are not quick enough with 
processing.

NiFi expects that processors would create data only when onTrigger function is 
called. This is problematic with CKS due to usage of KinesisClientLibrary (KCL) 
which is opinionated in way it manages (creates) threads that produce data. 

Standard approach for backpressure is to have a blocking queue that would be 
drained in onTrigger. This is not ideal because:
 * This blocking queue would have no notion of the size of the outgoing 
connection (FlowFiles #, size) so additional properties would be needed.
 * KCL creates a thread per each shard it subscribes to — this would create 
contention on the blocking queue on the happy-path scenario if a single queue 
was introduced. If a blocking queue per shard processor was introduced it could 
be cumbersome to track all those queues to drain in onTrigger.
 * Additionally when data from different shards should be processed separately 
for a single queue additional complexity would be needed for separation of 
records.

Alternative approach would be to allow all shard processors to work for a time 
period after the last onTrigger call. This approach would allow:
 * Shard processors to wait when onTrigger would not be called regularly.
 * No need for blocking queues, only simple locking mechanism with control kept 
in CKS.
 * Little changes to the current design.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to