[
https://issues.apache.org/jira/browse/FLINK-34252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812081#comment-17812081
]
David Christle edited comment on FLINK-34252 at 1/29/24 11:08 PM:
------------------------------------------------------------------
[~martijnvisser] The swapping of watermark between active/idle/active/idle
appears to happen when the data stream is _not_ idle. In the static vs
temporary no data vs dynamic assignment cases, those are all cases where a
stream should actually be signaled as "idle" in some way. But here, a stream
that is active is erroneously marked idle.
The `WatermarkAssignerOperator` isn't a Source, so it does not know which of
the three cases is causing idleness. To detect idleness, the Operator takes
`idleTimeout` as an argument, and compares it against the processing timestamp
of the last record it received. The way it appears the Operator _should_ work
is that if no record is incoming for longer than `idleTimeout`, it infers the
stream is idle, and emits `WatermarkStatus.IDLE`. This logic is like how
`withIdleness` works.
This makes sense: whether the reason for the idleness is a static assignment
that made it so no records are received, a stream doesn't produce records for a
while, or whether a split happens to not be assigned for a while due to dynamic
assignment/not enough splits, all of these cases translate to the Operator
observing no more records for too long of a time. When it hasn't seen any
records for longer than `idleTimeout`, it emits `WatermarkStatus.IDLE`. I
believe this signals to all downstream operators that this sub-stream is idle &
the should not wait for anymore watermarks from it.
The problem is that the code doesn't work like this. When data are arriving
faster than `idleTimeout`, the tracking of `lastRecordTime` is broken. This
triggers the idleness detection logic when it shouldn't - the stream is not
idle, and downstream operators _should_ wait for watermarks - but the Operator
is flipping back and forth between emitting
WatermarkStatus.IDLE/WatermarkStatus.ACTIVE.
If downstream operators receive an IDLE status from this Operator when they
shouldn't, if I understand correctly, they will advance their watermarks too
early (IDLE signals they should ignore this sub-stream in their watermark
update logic). This breaks the guarantees around watermarks/event time & could
cause incorrect results.
Here is a PR I submitted: https://github.com/apache/flink/pull/24211 - that
might make the issue/fix clearer.
was (Author: dchristle):
[~martijnvisser] The swapping of watermark between active/idle/active/idle
appears to happen when the data stream is _not_ idle. In the static vs
temporary no data vs dynamic assignment cases, those are all cases where a
stream should actually be signaled as "idle" in some way. But here, a stream
that is active is erroneously marked idle.
The `WatermarkAssignerOperator` isn't a Source, so it does not know which of
the three cases is causing idleness. To detect idleness, the Operator takes
`idleTimeout` as an argument, and compares it against the processing timestamp
of the last record it received. The way it appears the Operator _should_ work
is that if no record is incoming for longer than `idleTimeout`, it infers the
stream is idle, and emits `WatermarkStatus.IDLE`.
This makes sense: whether the reason for the idleness is a static assignment
that made it so no records are received, a stream doesn't produce records for a
while, or whether a split happens to not be assigned for a while due to dynamic
assignment/not enough splits, all of these cases translate to the Operator
observing no more records for too long of a time. When it hasn't seen any
records for longer than `idleTimeout`, it emits `WatermarkStatus.IDLE`. I
believe this signals to all downstream operators that this sub-stream is idle &
the should not wait for anymore watermarks from it.
The problem is that the code doesn't work like this. When data are arriving
faster than `idleTimeout`, the tracking of `lastRecordTime` is broken. This
triggers the idleness detection logic when it shouldn't - the stream is not
idle, and downstream operators _should_ wait for watermarks - but the Operator
is flipping back and forth between emitting
WatermarkStatus.IDLE/WatermarkStatus.ACTIVE.
If downstream operators receive an IDLE status from this Operator when they
shouldn't, if I understand correctly, they will advance their watermarks too
early (IDLE signals they should ignore this sub-stream in their watermark
update logic). This breaks the guarantees around watermarks/event time & could
cause incorrect results.
Here is a PR I submitted: https://github.com/apache/flink/pull/24211 - that
might make the issue/fix clearer.
> WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under
> continuous data flow
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-34252
> URL: https://issues.apache.org/jira/browse/FLINK-34252
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.16.3, 1.17.2, 1.18.1
> Reporter: David Christle
> Priority: Major
> Labels: pull-request-available
>
> The WatermarkAssignerOperator in the table runtime incorrectly transitions to
> an IDLE state even when data is continuously flowing. This behavior, observed
> under normal operating conditions where the interval between data elements is
> shorter than the configured idleTimeout, leads to regular transitions between
> ACTIVE and IDLE states, which are unnecessary.
> _Detail:_
> In the current implementation, the lastRecordTime variable, which tracks the
> time of the last received data element, is updated only when the
> WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated
> when WatermarkStatus is ACTIVE, which means even under continuous data flow,
> the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually
> always become true, and the WatermarkStatus will erroneously be marked IDLE.
> It is unclear to me if this bug produces any incorrectness downstream, since
> when the WatermarkStatus is in in the IDLE state, the next processElement
> will cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should
> eliminate this flip-flop behavior between states.
> The test I wrote fails without the fix and illustrates the flip-flops:
> {noformat}
> [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030
> s <<< FAILURE! -- in
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
> [ERROR]
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow
> -- Time elapsed: 0.013 s <<< FAILURE!
> java.lang.AssertionError:
> Expecting
> [WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE)]
> not to contain
> [WatermarkStatus(IDLE)]
> but found
> [WatermarkStatus(IDLE)]
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)