This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b79d3f01a4e [hotfix][table] Avoid sending duplicate delete record for 
left join when state expired
b79d3f01a4e is described below

commit b79d3f01a4e1160f097bc6a0273b8da65d626483
Author: lincoln lee <[email protected]>
AuthorDate: Mon Jan 22 20:33:42 2024 +0800

    [hotfix][table] Avoid sending duplicate delete record for left join when 
state expired
    
    This closes #24164
---
 .../join/lookup/KeyedLookupJoinWrapper.java        |  9 +--
 .../operators/join/KeyedLookupJoinHarnessTest.java | 83 ++++++++++++++++++++--
 2 files changed, 80 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
index 5fd5ed16ffe..8c718658116 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
@@ -150,7 +150,7 @@ public class KeyedLookupJoinWrapper extends 
KeyedProcessFunction<RowData, RowDat
                     RowData rightRow = uniqueState.value();
                     // should distinguish null from empty(join condition 
unsatisfied)
                     if (null == rightRow) {
-                        stateStaledErrorHandle(in, out);
+                        stateStaledErrorHandle();
                     } else if (!emptyRow.equals(rightRow)) {
                         collectDeleteRow(in, rightRow, out);
                         collected = true;
@@ -158,7 +158,7 @@ public class KeyedLookupJoinWrapper extends 
KeyedProcessFunction<RowData, RowDat
                 } else {
                     List<RowData> rightRows = state.value();
                     if (null == rightRows) {
-                        stateStaledErrorHandle(in, out);
+                        stateStaledErrorHandle();
                     } else {
                         for (RowData row : rightRows) {
                             if (!emptyRow.equals(row)) {
@@ -238,12 +238,9 @@ public class KeyedLookupJoinWrapper extends 
KeyedProcessFunction<RowData, RowDat
         }
     }
 
-    private void stateStaledErrorHandle(RowData in, Collector out) {
+    private void stateStaledErrorHandle() {
         if (lenient) {
             LOG.warn(STATE_CLEARED_WARN_MSG);
-            if (lookupJoinRunner.isLeftOuterJoin) {
-                lookupJoinRunner.padNullForLeftJoin(in, out);
-            }
         } else {
             throw new RuntimeException(STATE_CLEARED_WARN_MSG);
         }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
index 19505da9b07..229e9903ece 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
@@ -74,8 +74,6 @@ public class KeyedLookupJoinHarnessTest {
                         DataTypes.STRING().getLogicalType()
                     });
 
-    StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(10_000_000);
-
     @Test
     public void testTemporalInnerJoin() throws Exception {
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
@@ -352,12 +350,85 @@ public class KeyedLookupJoinHarnessTest {
         testHarness.close();
     }
 
-    // 
---------------------------------------------------------------------------------
+    @Test
+    public void testTemporalLeftJoinWithTtlLookupKeyContainsPk() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, 
true, 1_000);
+
+        testHarness.open();
+        // set TtlTimeProvider with 1
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+
+        // set TtlTimeProvider with 1001 to trigger expired state cleanup
+        testHarness.setStateTtlProcessingTime(1002);
+        // should output a delete message (pad null) since it's left join
+        testHarness.processElement(deleteRecord(2, "b"));
+
+        testHarness.processElement(insertRecord(2, "b2"));
+        testHarness.processElement(updateBeforeRecord(3, "c"));
+        testHarness.processElement(updateAfterRecord(3, "c2"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(3, "c", null, null));
+        expectedOutput.add(deleteRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(deleteRecord(3, "c", null, null));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testTemporalInnerJoinWithTtlLookupKeyContainsPk() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, 
true, 1_000);
+
+        testHarness.open();
+        // set TtlTimeProvider with 1
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
 
-    @SuppressWarnings("unchecked")
+        // set TtlTimeProvider with 1001 to trigger expired state cleanup
+        testHarness.setStateTtlProcessingTime(1002);
+        // should not output a delete message (pad null) since it's inner join
+        testHarness.processElement(deleteRecord(2, "b"));
+
+        testHarness.processElement(insertRecord(2, "b2"));
+        testHarness.processElement(updateBeforeRecord(3, "c"));
+        testHarness.processElement(updateAfterRecord(3, "c2"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    // 
---------------------------------------------------------------------------------
     private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
             JoinType joinType, FilterOnTable filterOnTable, boolean 
lookupKeyContainsPrimaryKey)
             throws Exception {
+        return createHarness(joinType, filterOnTable, 
lookupKeyContainsPrimaryKey, -1);
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
+            JoinType joinType,
+            FilterOnTable filterOnTable,
+            boolean lookupKeyContainsPrimaryKey,
+            long stateTtl)
+            throws Exception {
+        StateTtlConfig ttlConfig =
+                StateConfigUtil.createTtlConfig(stateTtl < 1 ? 1_000_000 : 
stateTtl);
         boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
         LookupJoinRunner joinRunner;
         TestingEvolvingOutputFetcherFunction fetcher;
@@ -372,7 +443,7 @@ public class KeyedLookupJoinHarnessTest {
                             new GeneratedFunctionWrapper<>(fetcher),
                             new GeneratedCollectorWrapper<>(
                                     new 
LookupJoinHarnessTest.TestingFetcherCollector()),
-                            new GeneratedFunctionWrapper(
+                            new GeneratedFunctionWrapper<>(
                                     new 
LookupJoinHarnessTest.TestingPreFilterCondition()),
                             isLeftJoin,
                             2);
@@ -384,7 +455,7 @@ public class KeyedLookupJoinHarnessTest {
                                     new 
LookupJoinHarnessTest.CalculateOnTemporalTable()),
                             new GeneratedCollectorWrapper<>(
                                     new 
LookupJoinHarnessTest.TestingFetcherCollector()),
-                            new GeneratedFunctionWrapper(
+                            new GeneratedFunctionWrapper<>(
                                     new 
LookupJoinHarnessTest.TestingPreFilterCondition()),
                             isLeftJoin,
                             2);

Reply via email to