[
https://issues.apache.org/jira/browse/FLINK-29494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rashmin Patel updated FLINK-29494:
----------------------------------
Summary: ChangeLogNormalize operator causes unexpected firing of past
windows after state restoration (was: ChangeLogNormalize operator causes
undesired firing of past windows after state restoration)
> ChangeLogNormalize operator causes unexpected firing of past windows after
> state restoration
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-29494
> URL: https://issues.apache.org/jira/browse/FLINK-29494
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Table SQL / Runtime
> Affects Versions: 1.14.2
> Environment: Flink version: 1.14.2
> API: Flink SQL
> Reporter: Rashmin Patel
> Priority: Critical
>
> *Issue Summary:*
> While doing GroupWindowAggregation on stream produced by `upsert-kafka`
> connector, I am facing an unexpected behaviour, where restoring a job from
> checkpoint/savepoint is causing past windows(wrt last watermark generated by
> previous job run) to fire.
> *Detailed Description:*
> My program is written in Flink SQL.
> Watermark Strategy: max-bounded-out-of-orderness with periodic generation
> (with default 200ms interval)
> Rowtime field: `updated_at_ts` which is monotonically increasing field in
> changelog stream produced by debezium.
> Below is the runtime topology of Flink Job
> Kafka Source (upsert mode) >> ChangeLogNormalize >> GroupWindowAggregate >>
> PostgresSink
> *Job Logic Context:*
> I am reading a cdc-stream from kafka and record schema looks something like
> this:
> (pk, loan_acc_no, status, created_at, *updated_at,* __op).
> Now I want to count number of distinct loan_acc_no with *hourly* window. So I
> have created watermark on {{updated_at}} field and hence tumbling also on
> {{updated_at}}
> *Usual scenario which triggers unexpected late windows:*
> Now suppose that for the previous job run, the latest running window
> was{color:#0747a6} {{2022-09-10 08:59:59}}{color} (win_end time) and job had
> processed events till {{{}08:30{}}}.
> Now upon restarting a job, suppose I got a first cdc event like (pk1, loan_1,
> "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color},
> {color:#00875a}{{2022-09-10 08:45:00}}{color}, "u") say it {*}E1{*}, which
> is not a late event wrt the last watermark generated by source operator in
> previous job run.
> Now there is ChangeLogNormalize operator in between kafka source and window
> operator. So, when kafka source forwards this *E1* to ChangeLogNormalize, it
> will emit two records which will be of type -U and +U, and will be passed as
> input to window operator.
> -U (pk1, loan_1, "pending", {color:#00875a}{{2022-09-02 00:00:00}}{color},
> {color:#00875a}{{2022-09-05 00:00:00}}{color}, "u") => previous state of
> record with key `{_}pk1{_}`
> +U (pk1, loan_1, "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color},
> {color:#00875a}{{2022-09-10 08:45:00}}{color}, "u") => same as E1
> So this -U type of events are causing the problem since their {{updated_at}}
> can be of any timestamp in the past and we are tumbling on this field. As per
> periodic watermarks, during the first watermark interval (i.e 200 ms
> default), no events will be considered late, so these -U events will create
> their window state and upon receiving first high watermark, they will fire.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)