Hong Liang Teoh created FLINK-32116:
---------------------------------------
Summary: FlinkKinesisConsumer cannot stop-with-savepoint when
configured with watermark assigner and watermark tracker
Key: FLINK-32116
URL: https://issues.apache.org/jira/browse/FLINK-32116
Project: Flink
Issue Type: Bug
Components: Connectors / Kinesis
Affects Versions: 1.15.4, 1.16.1, 1.17.0
Reporter: Hong Liang Teoh
Problem:
When FlinkKinesisConsumer is configured with legacy watermarking system, it is
unable to take a savepoint during stop-with-savepoint, and will get stuck
indefinitely.
{code:java}
FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new
SimpleStringSchema(), consumerConfig);
// Set up watermark assigner on Kinesis source
src.setPeriodicWatermarkAssigner(...);
// Set up watermark tracker on Kinesis source
src.setWatermarkTracker(...);{code}
*Why does it get stuck?*
When watermarks are setup, the `shardConsumer` and `recordEmitter` thread
communicate using asynchronous queue.
On stop-with-savepoint, shardConsumer waits for queue to empty before
continuing. recordEmitter is terminated before queue is empty. As such, queue
is never going to be empty, and app gets stuck indefinitely.
*Workarounds*
Use the new watermark framework
{code:java}
FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new
SimpleStringSchema(), consumerConfig);
env.addSource(src)
// Set up watermark strategy with both watermark assigner and watermark tracker
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)