[
https://issues.apache.org/jira/browse/FLINK-32116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer updated FLINK-32116:
----------------------------------
Affects Version/s: aws-connector-4.2.0
(was: aws-connector-4.1.0)
> FlinkKinesisConsumer cannot stop-with-savepoint when configured with
> watermark assigner and watermark tracker
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-32116
> URL: https://issues.apache.org/jira/browse/FLINK-32116
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: 1.16.1, 1.15.4, aws-connector-4.2.0
> Reporter: Hong Liang Teoh
> Assignee: Aleksandr Pilipenko
> Priority: Major
> Fix For: aws-connector-4.3.0
>
>
> Problem:
> When FlinkKinesisConsumer is configured with legacy watermarking system, it
> is unable to take a savepoint during stop-with-savepoint, and will get stuck
> indefinitely.
>
>
> {code:java}
> FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new
> SimpleStringSchema(), consumerConfig);
> // Set up watermark assigner on Kinesis source
> src.setPeriodicWatermarkAssigner(...);
> // Set up watermark tracker on Kinesis source
> src.setWatermarkTracker(...);{code}
>
>
> *Why does it get stuck?*
> When watermarks are setup, the `shardConsumer` and `recordEmitter` thread
> communicate using asynchronous queue.
> On stop-with-savepoint, shardConsumer waits for queue to empty before
> continuing. recordEmitter is terminated before queue is empty. As such, queue
> is never going to be empty, and app gets stuck indefinitely.
>
> *Workarounds*
> Use the new watermark framework
> {code:java}
> FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new
> SimpleStringSchema(), consumerConfig);
> env.addSource(src)
> // Set up watermark strategy with both watermark assigner and watermark
> tracker
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)