[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486875#comment-16486875
 ] 

ASF GitHub Bot commented on FLINK-9374:
---------------------------------------

Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r190154347
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws 
Exception {
                }
        }
     
    +   /**
    +    * If the internal queue of the {@link KinesisProducer} gets too long,
    +    * flush some of the records until we are below the limit again.
    +    * We don't want to flush _all_ records at this point since that would
    +    * break record aggregation.
    +    */
    +   private void checkQueueLimit() {
    +           while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +                   producer.flush();
    --- End diff --
    
    @tzulitai @bowenli86 I've given this some more thought. `wait()`/`notify()` 
requires a `synchronized` block. So if we just notify some lock in the 
callback, this would lead to synchronization overhead. We'd have to recognize a 
transition from »queue size > queue limit« to »queue size <= queue limit« and 
only synchronize then, which adds a lot of complexity.
    
    On the other hand: Kinesis accepts up to 1MB per second per shard. The 
queue limit should be chosen so that some data can be accumulated still before 
sending, i.e. more than a second of data (more than 1MB per shard). If the 
queue limit is chosen adequately, then the `Thread.sleep(500)` does not harm, 
as the queued records take more than one second to flush anyway. If the queue 
limit is chosen too low, then sleeping half a second may be too long, but we 
would not reach maximum throughput anyway because of the limitation on the 
number of `Put` requests.
    
    I think it's not worth the additional complexity.


> Flink Kinesis Producer does not backpressure
> --------------------------------------------
>
>                 Key: FLINK-9374
>                 URL: https://issues.apache.org/jira/browse/FLINK-9374
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>            Reporter: Franz Thoma
>            Priority: Critical
>         Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 100000 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to