David Christle created FLINK-34252:
--------------------------------------
Summary: WatermarkAssignerOperator should not emit
WatermarkStatus.IDLE under continuous data flow
Key: FLINK-34252
URL: https://issues.apache.org/jira/browse/FLINK-34252
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.18.1, 1.17.2, 1.16.3
Reporter: David Christle
The WatermarkAssignerOperator in the table runtime incorrectly transitions to
an IDLE state even when data is continuously flowing. This behavior, observed
under normal operating conditions where the interval between data elements is
shorter than the configured idleTimeout, leads to regular transitions between
ACTIVE and IDLE states, which are unnecessary.
_Detail:_
In the current implementation, the lastRecordTime variable, which tracks the
time of the last received data element, is updated only when the
WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated
when WatermarkStatus is ACTIVE, which means even under continuous data flow,
the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually
always become true, and the WatermarkStatus will erroneously be marked IDLE.
It is unclear to me if this bug produces any incorrectness downstream, since
when the WatermarkStatus is in in the IDLE state, the next processElement will
cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should eliminate
this flip-flop behavior between states.
The test I wrote fails without the fix and illustrates the flip-flops:
{noformat}
[ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 s
<<< FAILURE! -- in
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
[ERROR]
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow
-- Time elapsed: 0.013 s <<< FAILURE!
java.lang.AssertionError:
Expecting
[WatermarkStatus(IDLE),
WatermarkStatus(ACTIVE),
WatermarkStatus(IDLE),
WatermarkStatus(ACTIVE),
WatermarkStatus(IDLE),
WatermarkStatus(ACTIVE),
WatermarkStatus(IDLE),
WatermarkStatus(ACTIVE),
WatermarkStatus(IDLE)]
not to contain
[WatermarkStatus(IDLE)]
but found
[WatermarkStatus(IDLE)]
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)