This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 4e795b5f01a [FLINK-34166][table] Fix KeyedLookupJoinWrapper 
incorrectly process delete message for inner join when previous lookup result 
is empty
4e795b5f01a is described below

commit 4e795b5f01a42659367a55d9913e60b599247a92
Author: lincoln lee <[email protected]>
AuthorDate: Tue Jan 23 19:41:04 2024 +0800

    [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete 
message for inner join when previous lookup result is empty
    
    This closes #24167
---
 .../join/lookup/KeyedLookupJoinWrapper.java        | 29 ++++---
 .../operators/join/lookup/LookupJoinRunner.java    |  2 +-
 .../operators/join/KeyedLookupJoinHarnessTest.java | 92 ++++++++++++++++++++--
 3 files changed, 105 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
index 17eb2c8a319..3b0e2318691 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
@@ -134,34 +134,46 @@ public class KeyedLookupJoinWrapper extends 
KeyedProcessFunction<RowData, RowDat
             // fetcher has copied the input field when object reuse is enabled
             lookupJoinRunner.doFetch(in);
 
-            // update state will empty row if lookup miss
+            // update state with empty row if join condition unsatisfied
             if (!collectListener.collected) {
                 updateState(emptyRow);
             }
 
             lookupJoinRunner.padNullForLeftJoin(in, out);
         } else {
+            boolean collected = false;
             // do state access for non-acc msg
             if (lookupKeyContainsPrimaryKey) {
                 RowData rightRow = uniqueState.value();
-                // should distinguish null from empty(lookup miss)
+                // should distinguish null from empty(join condition 
unsatisfied)
                 if (null == rightRow) {
-                    stateStaledErrorHandle(in, out);
-                } else {
+                    stateStaledErrorHandle();
+                } else if (!emptyRow.equals(rightRow)) {
                     collectDeleteRow(in, rightRow, out);
+                    collected = true;
                 }
             } else {
                 List<RowData> rightRows = state.value();
                 if (null == rightRows) {
-                    stateStaledErrorHandle(in, out);
+                    stateStaledErrorHandle();
                 } else {
                     for (RowData row : rightRows) {
-                        collectDeleteRow(in, row, out);
+                        if (!emptyRow.equals(row)) {
+                            collectDeleteRow(in, row, out);
+                            collected = true;
+                        }
                     }
                 }
             }
             // clear state at last
             deleteState();
+
+            // pad null for left join if no delete row collected from state, 
here we can't use the
+            // collector's status to determine whether the row is collected or 
not, because data
+            // fetched from state is not collected by the collector
+            if (lookupJoinRunner.isLeftOuterJoin && !collected) {
+                collectDeleteRow(in, lookupJoinRunner.nullRow, out);
+            }
         }
     }
 
@@ -222,12 +234,9 @@ public class KeyedLookupJoinWrapper extends 
KeyedProcessFunction<RowData, RowDat
         }
     }
 
-    private void stateStaledErrorHandle(RowData in, Collector out) {
+    private void stateStaledErrorHandle() {
         if (lenient) {
             LOG.warn(STATE_CLEARED_WARN_MSG);
-            if (lookupJoinRunner.isLeftOuterJoin) {
-                lookupJoinRunner.padNullForLeftJoin(in, out);
-            }
         } else {
             throw new RuntimeException(STATE_CLEARED_WARN_MSG);
         }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
index f0d99384c45..8d79392d3c5 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
@@ -42,7 +42,7 @@ public class LookupJoinRunner extends 
ProcessFunction<RowData, RowData> {
     private transient FlatMapFunction<RowData, RowData> fetcher;
     protected transient ListenableCollector<RowData> collector;
     protected transient JoinedRowData outRow;
-    private transient GenericRowData nullRow;
+    protected transient GenericRowData nullRow;
 
     public LookupJoinRunner(
             GeneratedFunction<FlatMapFunction<RowData, RowData>> 
generatedFetcher,
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
index a4ed9af4b44..daaec0c55d3 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
@@ -74,8 +74,6 @@ public class KeyedLookupJoinHarnessTest {
                         DataTypes.STRING().getLogicalType()
                     });
 
-    StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(10_000_000);
-
     @Test
     public void testTemporalInnerJoin() throws Exception {
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
@@ -200,7 +198,6 @@ public class KeyedLookupJoinHarnessTest {
         List<Object> expectedOutput = new ArrayList<>();
         expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
         expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
-        expectedOutput.add(deleteRecord(3, "c", null, null));
         expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
         expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
         expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
@@ -288,6 +285,8 @@ public class KeyedLookupJoinHarnessTest {
         testHarness.processElement(updateAfterRecord(3, "c2"));
         testHarness.processElement(deleteRecord(3, "c2"));
         testHarness.processElement(insertRecord(3, "c3"));
+        testHarness.processElement(deleteRecord(4, "d"));
+        testHarness.processElement(insertRecord(4, null));
 
         List<Object> expectedOutput = new ArrayList<>();
         expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
@@ -304,6 +303,8 @@ public class KeyedLookupJoinHarnessTest {
         expectedOutput.add(deleteRecord(3, "c2", 6, "Jackson-2"));
         expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
         expectedOutput.add(insertRecord(3, "c3", 9, "Jackson-3"));
+        expectedOutput.add(deleteRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(insertRecord(4, null, 8, "Fabian-2"));
 
         assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
         testHarness.close();
@@ -327,6 +328,8 @@ public class KeyedLookupJoinHarnessTest {
         testHarness.processElement(updateAfterRecord(3, "c2"));
         testHarness.processElement(deleteRecord(3, "c2"));
         testHarness.processElement(insertRecord(3, "c3"));
+        testHarness.processElement(deleteRecord(4, "d"));
+        testHarness.processElement(insertRecord(4, null));
 
         List<Object> expectedOutput = new ArrayList<>();
         expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
@@ -340,17 +343,92 @@ public class KeyedLookupJoinHarnessTest {
         expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
         expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
         expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+        expectedOutput.add(deleteRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(insertRecord(4, null, 8, "Fabian-2"));
 
         assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
         testHarness.close();
     }
 
-    // 
---------------------------------------------------------------------------------
+    @Test
+    public void testTemporalLeftJoinWithTtlLookupKeyContainsPk() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, 
true, 1_000);
+
+        testHarness.open();
+        // set TtlTimeProvider with 1
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+
+        // set TtlTimeProvider with 1001 to trigger expired state cleanup
+        testHarness.setStateTtlProcessingTime(1002);
+        // should output a delete message (pad null) since it's left join
+        testHarness.processElement(deleteRecord(2, "b"));
+
+        testHarness.processElement(insertRecord(2, "b2"));
+        testHarness.processElement(updateBeforeRecord(3, "c"));
+        testHarness.processElement(updateAfterRecord(3, "c2"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(3, "c", null, null));
+        expectedOutput.add(deleteRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(deleteRecord(3, "c", null, null));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
 
-    @SuppressWarnings("unchecked")
+    @Test
+    public void testTemporalInnerJoinWithTtlLookupKeyContainsPk() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, 
true, 1_000);
+
+        testHarness.open();
+        // set TtlTimeProvider with 1
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+
+        // set TtlTimeProvider with 1001 to trigger expired state cleanup
+        testHarness.setStateTtlProcessingTime(1002);
+        // should not output a delete message (pad null) since it's inner join
+        testHarness.processElement(deleteRecord(2, "b"));
+
+        testHarness.processElement(insertRecord(2, "b2"));
+        testHarness.processElement(updateBeforeRecord(3, "c"));
+        testHarness.processElement(updateAfterRecord(3, "c2"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    // 
---------------------------------------------------------------------------------
     private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
             JoinType joinType, FilterOnTable filterOnTable, boolean 
lookupKeyContainsPrimaryKey)
             throws Exception {
+        return createHarness(joinType, filterOnTable, 
lookupKeyContainsPrimaryKey, -1);
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
+            JoinType joinType,
+            FilterOnTable filterOnTable,
+            boolean lookupKeyContainsPrimaryKey,
+            long stateTtl)
+            throws Exception {
+        StateTtlConfig ttlConfig =
+                StateConfigUtil.createTtlConfig(stateTtl < 1 ? 1_000_000 : 
stateTtl);
         boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
         LookupJoinRunner joinRunner;
         TestingEvolvingOutputFetcherFunction fetcher;
@@ -486,8 +564,8 @@ public class KeyedLookupJoinHarnessTest {
             int currentCnt = counter(id);
             List<GenericRowData> rows = lookup(id);
             if (rows != null) {
-                for (int i = 0; i < rows.size(); i++) {
-                    collectUpdatedRow(rows.get(i), currentCnt, out);
+                for (GenericRowData row : rows) {
+                    collectUpdatedRow(row, currentCnt, out);
                 }
             } else if (currentCnt > 1) {
                 // return a default value for which lookup miss at 1st time

Reply via email to