[ 
https://issues.apache.org/jira/browse/FLINK-34166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809254#comment-17809254
 ] 

lincoln lee commented on FLINK-34166:
-------------------------------------

fixed in master: 27e6ac836171c5c5539ceeb234a806be661cc30a

> KeyedLookupJoinWrapper incorrectly process delete message for inner join when 
> previous lookup result is empty
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34166
>                 URL: https://issues.apache.org/jira/browse/FLINK-34166
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.2, 1.18.1
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.19.0, 1.18.2
>
>
> KeyedLookupJoinWrapper(when 'table.optimizer.non-deterministic-update.strategy
> ' is set to 'TRY_RESOLVE' and the lookup join exists NDU problemns) 
> incorrectly process delete message for inner join when previous lookup result 
> is empty
> The intermediate delete result 
> {code}
>         expectedOutput.add(deleteRecord(3, "c", null, null));
> {code}
> in current case 
> KeyedLookupJoinHarnessTest#testTemporalInnerJoinWithFilterLookupKeyContainsPk 
> is incorrect:
> {code}
>     @Test
>     public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws 
> Exception {
>         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
>                 createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, 
> true);
>         testHarness.open();
>         testHarness.processElement(insertRecord(1, "a"));
>         testHarness.processElement(insertRecord(2, "b"));
>         testHarness.processElement(insertRecord(3, "c"));
>         testHarness.processElement(insertRecord(4, "d"));
>         testHarness.processElement(insertRecord(5, "e"));
>         testHarness.processElement(updateBeforeRecord(3, "c"));
>         testHarness.processElement(updateAfterRecord(3, "c2"));
>         testHarness.processElement(deleteRecord(3, "c2"));
>         testHarness.processElement(insertRecord(3, "c3"));
>         List<Object> expectedOutput = new ArrayList<>();
>         expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
>         expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
>         expectedOutput.add(deleteRecord(3, "c", null, null));
>         expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
>         expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
>         expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
>         assertor.assertOutputEquals("output wrong.", expectedOutput, 
> testHarness.getOutput());
>         testHarness.close();
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to