[ 
https://issues.apache.org/jira/browse/FLINK-35815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Dziolak updated FLINK-35815:
--------------------------------------
    Affects Version/s: 1.19.1
                           (was: 1.16.1)
                           (was: 1.15.4)

> KinesisProxySyncV2 doesn't always retry throttling exceptions. 
> ---------------------------------------------------------------
>
>                 Key: FLINK-35815
>                 URL: https://issues.apache.org/jira/browse/FLINK-35815
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0, 1.19.1
>            Reporter: Krzysztof Dziolak
>            Assignee: Aleksandr Pilipenko
>            Priority: Major
>             Fix For: aws-connector-4.4.0
>
>
> Problem:
> When FlinkKinesisConsumer is configured with legacy watermarking system, it 
> is unable to take a savepoint during stop-with-savepoint, and will get stuck 
> indefinitely.
>  
>  
> {code:java}
> FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new 
> SimpleStringSchema(), consumerConfig);
> // Set up watermark assigner on Kinesis source
> src.setPeriodicWatermarkAssigner(...);
> // Set up watermark tracker on Kinesis source
> src.setWatermarkTracker(...);{code}
>  
>  
> *Why does it get stuck?*
> When watermarks are setup, the `shardConsumer` and `recordEmitter` thread 
> communicate using asynchronous queue.
> On stop-with-savepoint, shardConsumer waits for queue to empty before 
> continuing. recordEmitter is terminated before queue is empty. As such, queue 
> is never going to be empty, and app gets stuck indefinitely.
>  
> *Workarounds*
> Use the new watermark framework
> {code:java}
> FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new 
> SimpleStringSchema(), consumerConfig);
> env.addSource(src)
> // Set up watermark strategy with both watermark assigner and watermark 
> tracker
>     
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to