[ 
https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-29849:
--------------------------------
    Description: 
For current implementation, the execution plan is incorrect when do event time 
temporal join on an upsert source. There's two problems:
1.  for an upsert source, we should not add a ChangelogNormalize node under a 
temporal join input, or it will damage the versions of the version table. For 
versioned tables, we use a single-temporal mechanism which relies sequencial 
records of a same key to ensure the valid period of each version, so if the 
ChangelogNormalize was added then an UB message will be produced based on the 
previous  UA or Insert message, and all the columns are totally same include 
event time, e.g., 
original upsert input
{code}
+I (key1, '2022-11-02 10:00:00', a1)
+U (key1, '2022-11-02 10:01:03', a2)
{code}

the versioned data should be:
{code}
v1  [~, '2022-11-02 10:00:00')
v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
{code}

after ChangelogNormalize's processing, will output:
{code}
+I (key1, '2022-11-02 10:00:00', a1)
-U (key1, '2022-11-02 10:00:00', a1)
+U (key1, '2022-11-02 10:01:03', a2)
{code}

versions are incorrect:
{code}
v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
{code}

2. semantically, a filter cannot be pushed into an event time temporal join, 
otherwise, the filter may also corrupt the versioned table


  was:
For current implementation, the execution plan is incorrect when do event time 
temporal join on an upsert source. There's two problems:
1.  for an upsert source, we should not add a ChangelogNormalize node under a 
temporal join input, or it will damage the versions of the version table. For 
versioned tables, we use a single-temporal mechanism which relies sequencial 
records of a same key to ensure the valid period of each version, so if the 
ChangelogNormalize was added then an UB message will be produced based on the 
previous  UA or Insert message, and all the columns are totally same include 
event time, e.g., 
original upsert input
{code}
+I (key1, '2022-11-02 10:00:00', a1)
+U (key1, '2022-11-02 10:01:03', a2)
{code}

the versioned data should be:
{code}
v1  [~, '2022-11-02 10:00:00')
v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
{code}

after ChangelogNormalize's processing, will output:
{code}
+I (key1, '2022-11-02 10:00:00', a1)
-U (key1, '2022-11-02 10:00:00', a1)
+U (key1, '2022-11-02 10:01:03', a2)
{code}

versions are incorrect:
{code}
v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
{code}

2. semantically, a filter cannot be pushed into a temporal join which using 
event time, otherwise, the filter may also corrupt the versioned table



> Event time temporal join on an upsert source may produce incorrect execution 
> plan
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-29849
>                 URL: https://issues.apache.org/jira/browse/FLINK-29849
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0, 1.15.2
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> For current implementation, the execution plan is incorrect when do event 
> time temporal join on an upsert source. There's two problems:
> 1.  for an upsert source, we should not add a ChangelogNormalize node under a 
> temporal join input, or it will damage the versions of the version table. For 
> versioned tables, we use a single-temporal mechanism which relies sequencial 
> records of a same key to ensure the valid period of each version, so if the 
> ChangelogNormalize was added then an UB message will be produced based on the 
> previous  UA or Insert message, and all the columns are totally same include 
> event time, e.g., 
> original upsert input
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> the versioned data should be:
> {code}
> v1  [~, '2022-11-02 10:00:00')
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> after ChangelogNormalize's processing, will output:
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> -U (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> versions are incorrect:
> {code}
> v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> 2. semantically, a filter cannot be pushed into an event time temporal join, 
> otherwise, the filter may also corrupt the versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to