[
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek updated FLINK-9374:
------------------------------------
Component/s: Kinesis Connector
> Flink Kinesis Producer does not backpressure
> --------------------------------------------
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
> Issue Type: Bug
> Components: Kinesis Connector
> Reporter: Franz Thoma
> Priority: Critical
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may
> grow indefinitely if Flink sends records faster than the KPL can forward them
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued
> records are dropped after a certain amount of time, but this will lead to
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing:
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a
> checkpoint is reached (and will wait until the queue is flushed), or until
> out-of-memory, whichever is reached first. (This gets worse due to the fact
> that the Java KPL is only a thin wrapper around a C++ process, so it is not
> even the Java process that runs out of memory, but the C++ process.) The
> implicit rate-limit due to checkpointing leads to a ragged throughput graph
> like this (the periods with zero throughput are the wait times before a
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a
> maximum number of records that may be waiting in the KPL queue. If this limit
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and
> wait (blocking) until the queue length is below the limit again. This
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}}
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a
> client explicitly sets the value. Setting a »sane« default value is not
> possible unfortunately, since sensible values for the limit depend on the
> record size (the limit should be chosen so that about 10–100MB of records per
> shard are accumulated before flushing, otherwise the maximum Kinesis
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 100000 records (the spikes are
> checkpoints, where the queue is still flushed completely)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)