[ https://issues.apache.org/jira/browse/FLINK-18405 ]


    Piotr Nowojski deleted comment on FLINK-18405:
    ----------------------------------------

was (Author: pnowojski):
To rephrase the problem using the concrete SQL OVER example.

SQL Over ({{RowTimeRowsBoundedPrecedingFunction}}) has this logic:
{code:java}
    @Override
    public void processElement(
            RowData input,
            KeyedProcessFunction<K, RowData, RowData>.Context ctx,
            Collector<RowData> out)
            throws Exception {
        (...)

        // triggering timestamp for trigger calculation
        long triggeringTs = input.getLong(rowTimeIdx);

        Long lastTriggeringTs = lastTriggeringTsState.value();
        if (lastTriggeringTs == null) {
            lastTriggeringTs = 0L;
        }

        // check if the data is expired, if not, save the data and register 
event time timer
        if (triggeringTs > lastTriggeringTs) {
            (...)
        } else {
            numLateRecordsDropped.inc();
        }
    }
{code}
The problem stated by this ticket can apply both to aligned and unaligned 
checkpoints. Namely every time after recovery, this operator/function processes 
any elements, before receiving a watermark, it will consider all of the records 
late, and it will drop them.

For aligned checkpoints, there is a workaround. After recovery, before source 
produces any record, it could emit last emitted watermark, thus ensuring that 
watermarks will be received by downstream operators before any elements.

For unaligned checkpoints, we can't use that trick, as anyway the in-flight 
data are recovered and processed before anything new produced by the sources 
:confused:

> Add watermark support for unaligned checkpoints
> -----------------------------------------------
>
>                 Key: FLINK-18405
>                 URL: https://issues.apache.org/jira/browse/FLINK-18405
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.12.0
>            Reporter: Arvid Heise
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major
>
> Currently, Flink generates the watermark as a first step of recovery instead 
> of 
> storing the latest watermark in the operators to ease rescaling. In unaligned 
> checkpoints, that means on recovery, Flink generates watermarks after it 
> restores in-flight data. If your pipeline uses an operator that applies the
> latest watermark on each record, it will produce incorrect results during 
> recovery if the watermark is not directly or indirectly part of the operator 
> state. Thus, SQL OVER operator should not be used with unaligned
> checkpoints, while window operators are safe to use. 
> A possible solution is to store the watermark in the operator state. If 
> rescaling may occur, watermarks should be stored per key-group in a 
> union-state. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to