[ https://issues.apache.org/jira/browse/FLINK-35157 ]
elon_X deleted comment on FLINK-35157:
--------------------------------
was (Author: JIRAUSER303028):
[~fanrui] Sure, I will backport this fix to 1.17, 1.18, and 1.19. Thank you :D
> Sources with watermark alignment get stuck once some subtasks finish
> --------------------------------------------------------------------
>
> Key: FLINK-35157
> URL: https://issues.apache.org/jira/browse/FLINK-35157
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.17.2, 1.19.0, 1.18.1
> Reporter: Gyula Fora
> Assignee: elon_X
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-04-24-21-36-16-146.png
>
>
> The current watermark alignment logic can easily get stuck if some subtasks
> finish while others are still running.
> The reason is that once a source subtask finishes, the subtask is not
> excluded from alignment, effectively blocking the rest of the job to make
> progress beyond last wm + alignment time for the finished sources.
> This can be easily reproduced by the following simple pipeline:
> {noformat}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> DataStream<Long> s = env.fromSource(new NumberSequenceSource(0, 100),
>
> WatermarkStrategy.<Long>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Long>)
> (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10),
> Duration.ofSeconds(2)),
> "Sequence Source").filter((FilterFunction<Long>) aLong -> {
> Thread.sleep(200);
> return true;
> }
> );
> s.print();
> env.execute();{noformat}
> The solution could be to send out a max watermark event once the sources
> finish or to exclude them from the source coordinator
--
This message was sent by Atlassian Jira
(v8.20.10#820010)