[
https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-29849:
-----------------------------------
Labels: pull-request-available (was: )
> Event time temporal join on an upsert source may produce incorrect execution
> plan
> ---------------------------------------------------------------------------------
>
> Key: FLINK-29849
> URL: https://issues.apache.org/jira/browse/FLINK-29849
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.16.0, 1.15.2
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
>
> For current implementation, the execution plan is incorrect when do event
> time temporal join on an upsert source. There's two problems:
> 1. for an upsert source, we should not add a ChangelogNormalize node under a
> temporal join input, or it will damage the versions of the version table. For
> versioned tables, we use a single-temporal mechanism which relies sequencial
> records of a same key to ensure the valid period of each version, so if the
> ChangelogNormalize was added then an UB message will be produced based on the
> previous UA or Insert message, and all the columns are totally same include
> event time, e.g.,
> original upsert input
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> the versioned data should be:
> {code}
> v1 [~, '2022-11-02 10:00:00')
> v2 ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> after ChangelogNormalize's processing, will output:
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> -U (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> versions are incorrect:
> {code}
> v1 ['2022-11-02 10:00:00', '2022-11-02 10:00:00') // invalid period
> v2 ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> 2. semantically, a filter cannot be pushed into a temporal join which using
> event time, otherwise, the filter may also corrupt the versioned table
--
This message was sent by Atlassian Jira
(v8.20.10#820010)