[ https://issues.apache.org/jira/browse/FLINK-18405 ]
Piotr Nowojski deleted comment on FLINK-18405:
----------------------------------------
was (Author: pnowojski):
To rephrase the problem using the concrete SQL OVER example.
SQL Over ({{RowTimeRowsBoundedPrecedingFunction}}) has this logic:
{code:java}
@Override
public void processElement(
RowData input,
KeyedProcessFunction<K, RowData, RowData>.Context ctx,
Collector<RowData> out)
throws Exception {
(...)
// triggering timestamp for trigger calculation
long triggeringTs = input.getLong(rowTimeIdx);
Long lastTriggeringTs = lastTriggeringTsState.value();
if (lastTriggeringTs == null) {
lastTriggeringTs = 0L;
}
// check if the data is expired, if not, save the data and register
event time timer
if (triggeringTs > lastTriggeringTs) {
(...)
} else {
numLateRecordsDropped.inc();
}
}
{code}
The problem stated by this ticket can apply both to aligned and unaligned
checkpoints. Namely every time after recovery, this operator/function processes
any elements, before receiving a watermark, it will consider all of the records
late, and it will drop them.
For aligned checkpoints, there is a workaround. After recovery, before source
produces any record, it could emit last emitted watermark, thus ensuring that
watermarks will be received by downstream operators before any elements.
For unaligned checkpoints, we can't use that trick, as anyway the in-flight
data are recovered and processed before anything new produced by the sources
:confused:
> Add watermark support for unaligned checkpoints
> -----------------------------------------------
>
> Key: FLINK-18405
> URL: https://issues.apache.org/jira/browse/FLINK-18405
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Network
> Affects Versions: 1.12.0
> Reporter: Arvid Heise
> Priority: Not a Priority
> Labels: auto-deprioritized-major
>
> Currently, Flink generates the watermark as a first step of recovery instead
> of
> storing the latest watermark in the operators to ease rescaling. In unaligned
> checkpoints, that means on recovery, Flink generates watermarks after it
> restores in-flight data. If your pipeline uses an operator that applies the
> latest watermark on each record, it will produce incorrect results during
> recovery if the watermark is not directly or indirectly part of the operator
> state. Thus, SQL OVER operator should not be used with unaligned
> checkpoints, while window operators are safe to use.
> A possible solution is to store the watermark in the operator state. If
> rescaling may occur, watermarks should be stored per key-group in a
> union-state.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)