[
https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-35886:
-----------------------------------
Labels: pull-request-available (was: )
> Incorrect watermark idleness timeout accounting when subtask is
> backpressured/blocked
> -------------------------------------------------------------------------------------
>
> Key: FLINK-35886
> URL: https://issues.apache.org/jira/browse/FLINK-35886
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, Runtime / Task
> Affects Versions: 1.18.1, 1.20.0, 1.19.1
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
> Priority: Critical
> Labels: pull-request-available
>
> Currently when using watermark with idleness in Flink, idleness can be
> incorrectly detected when reading records from a source that is blocked by
> the runtime. For example this can easily happen when source is either
> backpressured, or blocked by the watermark alignment. In those cases, despite
> there are more records to be read from the source (or source’s split),
> runtime is deciding not to poll (or being unable to) those records. In such
> case idleness timeout can kick in, marking source/source split as idle, which
> can lead to incorrect combined watermark calculations and dropping of
> incorrectly marked late records.
> h4. Watermark alignment
> If there are two source splits, A and B , and maxAllowedWatermarkDrift is set
> to 30s.
> # Partition A emitted watermark 1042 sec, while partition B sits at watermark
> 1000 sec.
> # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by
> the watermark alignment.
> # For the duration of idleTimeout, partition B is emitting some large batch
> of records, that do not advance watermark of that partition by much. For
> example either watermark for partition B stays 1000s, or is updated by a
> small amount to for example 1005s.
> # idleTimeout kicks in, marking partition A as idle
> # partition B finishes emitting large batch of those older records, and let's
> say now there is a gap in rowtimes. Previously partition B was emitting
> records with rowtime ~1000s, now it jumps to for example ~5000s.
> # As partition A is idle, combined watermark jumps to ~5000s as well.
> # Watermark alignment unblocks partition A, and it continues emitting records
> with rowtime ~1042s. But now all of those records are dropped due to being
> late.
> h4. Backpressure
> When there are two SourceOperator’s, A and B. Due to for example some data
> skew, it could happen that either only A gets backpressured, or A is
> backpressured quicker/sooner. Either way, during that time when A is
> backpressured, while B is not, B can bump the combined watermark high enough,
> so that when backpressure recedes, fresh records from A will be considered as
> late, leading to incorrect results.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)