lincoln lee created FLINK-29849:
-----------------------------------
Summary: Event time temporal join on an upsert source may produce
incorrect execution plan
Key: FLINK-29849
URL: https://issues.apache.org/jira/browse/FLINK-29849
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.15.2, 1.16.0
Reporter: lincoln lee
Fix For: 1.17.0
For current implementation, the execution plan is incorrect when do event time
temporal join on an upsert source. There's two problems:
1. for an upsert source, we should not add a ChangelogNormalize node under a
temporal join input, or it will damage the versions of the version table. For
versioned tables, we use a single-temporal mechanism which relies sequencial
records of a same key to ensure the valid period of each version, so if the
ChangelogNormalize was added then an UB message will be produced based on the
previous UA or Insert message, and all the columns are totally same include
event time, e.g.,
original upsert input
{code}
+I (key1, '2022-11-02 10:00:00', a1)
+U (key1, '2022-11-02 10:01:03', a2)
{code}
the versioned data should be:
{code}
v1 [~, '2022-11-02 10:00:00')
v2 ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
{code}
after ChangelogNormalize's processing, will output:
{code}
+I (key1, '2022-11-02 10:00:00', a1)
-U (key1, '2022-11-02 10:00:00', a1)
+U (key1, '2022-11-02 10:01:03', a2)
{code}
versions are incorrect:
{code}
v1 ['2022-11-02 10:00:00', '2022-11-02 10:00:00') // invalid period
v2 ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
{code}
2. semantically, a filter cannot be pushed into a temporal join which using
event time, otherwise, the filter may also corrupt the versioned table
--
This message was sent by Atlassian Jira
(v8.20.10#820010)