[ 
https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691399#comment-17691399
 ] 

lincoln lee commented on FLINK-29849:
-------------------------------------

Just for record: the conclusions of reviewing the semantic impact of 
filter-related optimizations/rewrites on streaming scenarios(filter pushdown 
disturb time attributes). The following releated operations were checked:

1. temporal join, including eventtime and proctime(proctime temporal join is 
unsupported yet, one similar solution is lookup join), the essence is that 
filter should not go through snapshot to pre-filter data which may destroy the 
data version (corresponding to the original data may be changed over time), 
otherwise it affects the result
2. window related, does not affect the result, but only affects the density of 
watermark generation and the timing of the output result

3. interval join, does not affect the result(the on condition in left join can 
be pushdown, the post where condition can change join to inner join)
4. deduplication, does not affect the result, the current filter push down 
already consider the 'containsOver' protection judgment

> Event time temporal join on an upsert source may produce incorrect execution 
> plan
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-29849
>                 URL: https://issues.apache.org/jira/browse/FLINK-29849
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0, 1.15.3
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.16.1
>
>
> For current implementation, the execution plan is incorrect when do event 
> time temporal join on an upsert source. There's two problems:
> 1.  for an upsert source, we should not add a ChangelogNormalize node under a 
> temporal join input, or it will damage the versions of the version table. For 
> versioned tables, we use a single-temporal mechanism which relies sequencial 
> records of a same key to ensure the valid period of each version, so if the 
> ChangelogNormalize was added then an UB message will be produced based on the 
> previous  UA or Insert message, and all the columns are totally same include 
> event time, e.g., 
> original upsert input
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> the versioned data should be:
> {code}
> v1  [~, '2022-11-02 10:00:00')
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> after ChangelogNormalize's processing, will output:
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> -U (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> versions are incorrect:
> {code}
> v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> 2. semantically, a filter cannot be pushed into an event time temporal join, 
> otherwise, the filter may also corrupt the versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to