[
https://issues.apache.org/jira/browse/FLINK-34166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809254#comment-17809254
]
lincoln lee commented on FLINK-34166:
-------------------------------------
fixed in master: 27e6ac836171c5c5539ceeb234a806be661cc30a
> KeyedLookupJoinWrapper incorrectly process delete message for inner join when
> previous lookup result is empty
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34166
> URL: https://issues.apache.org/jira/browse/FLINK-34166
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.17.2, 1.18.1
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> KeyedLookupJoinWrapper(when 'table.optimizer.non-deterministic-update.strategy
> ' is set to 'TRY_RESOLVE' and the lookup join exists NDU problemns)
> incorrectly process delete message for inner join when previous lookup result
> is empty
> The intermediate delete result
> {code}
> expectedOutput.add(deleteRecord(3, "c", null, null));
> {code}
> in current case
> KeyedLookupJoinHarnessTest#testTemporalInnerJoinWithFilterLookupKeyContainsPk
> is incorrect:
> {code}
> @Test
> public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws
> Exception {
> OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
> createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER,
> true);
> testHarness.open();
> testHarness.processElement(insertRecord(1, "a"));
> testHarness.processElement(insertRecord(2, "b"));
> testHarness.processElement(insertRecord(3, "c"));
> testHarness.processElement(insertRecord(4, "d"));
> testHarness.processElement(insertRecord(5, "e"));
> testHarness.processElement(updateBeforeRecord(3, "c"));
> testHarness.processElement(updateAfterRecord(3, "c2"));
> testHarness.processElement(deleteRecord(3, "c2"));
> testHarness.processElement(insertRecord(3, "c3"));
> List<Object> expectedOutput = new ArrayList<>();
> expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
> expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
> expectedOutput.add(deleteRecord(3, "c", null, null));
> expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
> expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
> expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
> assertor.assertOutputEquals("output wrong.", expectedOutput,
> testHarness.getOutput());
> testHarness.close();
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)