[ 
https://issues.apache.org/jira/browse/FLINK-22326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326168#comment-17326168
 ] 

Lu Niu commented on FLINK-22326:
--------------------------------

Hi, [~Thesharing] we confirmed this is a bug in current flink version.

Problem:
Jobs contains Iterate Operator is not able to checkpoint when feedback QPS is 
low.

Given feedback QPS is low, we want to set 
[maxWaitTimeMillis|https://github.com/qqibrow/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L570]
 to minutes otherwise IterationSource will switched from RUNNING to FINISHED 
and checkpoint will abort "abort the checkpoint Failure reason: Not all 
required tasks are currently running."

However, when set maxWaitTimeMillis to minutes or [no 
set|https://github.com/qqibrow/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L534],
 StreamIterationHead will be blocked on 
[polling|https://github.com/qqibrow/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L72],
 which blocks checkpointing since checkpoint also happens in the very same 
thread after threading-model change in StreamTask. Finally, checkpoint timeout.

Hence, regardless of the value of 
[maxWaitTimeMillis|https://github.com/qqibrow/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L570],
 checkpoint will fail when eedback QPS is low.

 

Proposal:
Can we introduce another config to close the source if no input in X time? 

cc [~trohrmann] 

> Job contains Iterate Operator always fails on Checkpoint 
> ---------------------------------------------------------
>
>                 Key: FLINK-22326
>                 URL: https://issues.apache.org/jira/browse/FLINK-22326
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.1
>            Reporter: Lu Niu
>            Priority: Major
>         Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot 
> 2021-04-16 at 12.43.38 PM.png
>
>
> Job contains Iterate Operator will always fail on checkpoint.
> How to reproduce: 
> [https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785]
> this is based on 
> [https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,]
>  but a few line difference:
>  1. Make maxWaitTime large enough when create IterativeStream
> 2. No output back to Itertive Source
> Result:
> The same code is able to checkpoint in 1.9.1
> !Screen Shot 2021-04-16 at 12.43.38 PM.png!
>  
> but always fail on checkpoint in 1.11
> !Screen Shot 2021-04-16 at 12.40.34 PM.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to