lincoln lee created FLINK-34166:
-----------------------------------

             Summary: KeyedLookupJoinWrapper incorrectly process delete message 
for inner join when previous lookup result is empty
                 Key: FLINK-34166
                 URL: https://issues.apache.org/jira/browse/FLINK-34166
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.18.1, 1.17.2
            Reporter: lincoln lee
            Assignee: lincoln lee
             Fix For: 1.19.0, 1.18.2


KeyedLookupJoinWrapper(when 'table.optimizer.non-deterministic-update.strategy
' is set to 'TRY_RESOLVE' and the lookup join exists NDU problemns) incorrectly 
process delete message for inner join when previous lookup result is empty

The intermediate delete result 
{code}
        expectedOutput.add(deleteRecord(3, "c", null, null));
{code}
in current case 
KeyedLookupJoinHarnessTest#testTemporalInnerJoinWithFilterLookupKeyContainsPk 
is incorrect:
{code}
    @Test
    public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws 
Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
                createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, 
true);

        testHarness.open();

        testHarness.processElement(insertRecord(1, "a"));
        testHarness.processElement(insertRecord(2, "b"));
        testHarness.processElement(insertRecord(3, "c"));
        testHarness.processElement(insertRecord(4, "d"));
        testHarness.processElement(insertRecord(5, "e"));
        testHarness.processElement(updateBeforeRecord(3, "c"));
        testHarness.processElement(updateAfterRecord(3, "c2"));
        testHarness.processElement(deleteRecord(3, "c2"));
        testHarness.processElement(insertRecord(3, "c3"));

        List<Object> expectedOutput = new ArrayList<>();
        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(deleteRecord(3, "c", null, null));
        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));

        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
        testHarness.close();
    }
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to