This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b79d3f01a4e [hotfix][table] Avoid sending duplicate delete record for
left join when state expired
b79d3f01a4e is described below
commit b79d3f01a4e1160f097bc6a0273b8da65d626483
Author: lincoln lee <[email protected]>
AuthorDate: Mon Jan 22 20:33:42 2024 +0800
[hotfix][table] Avoid sending duplicate delete record for left join when
state expired
This closes #24164
---
.../join/lookup/KeyedLookupJoinWrapper.java | 9 +--
.../operators/join/KeyedLookupJoinHarnessTest.java | 83 ++++++++++++++++++++--
2 files changed, 80 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
index 5fd5ed16ffe..8c718658116 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
@@ -150,7 +150,7 @@ public class KeyedLookupJoinWrapper extends
KeyedProcessFunction<RowData, RowDat
RowData rightRow = uniqueState.value();
// should distinguish null from empty(join condition
unsatisfied)
if (null == rightRow) {
- stateStaledErrorHandle(in, out);
+ stateStaledErrorHandle();
} else if (!emptyRow.equals(rightRow)) {
collectDeleteRow(in, rightRow, out);
collected = true;
@@ -158,7 +158,7 @@ public class KeyedLookupJoinWrapper extends
KeyedProcessFunction<RowData, RowDat
} else {
List<RowData> rightRows = state.value();
if (null == rightRows) {
- stateStaledErrorHandle(in, out);
+ stateStaledErrorHandle();
} else {
for (RowData row : rightRows) {
if (!emptyRow.equals(row)) {
@@ -238,12 +238,9 @@ public class KeyedLookupJoinWrapper extends
KeyedProcessFunction<RowData, RowDat
}
}
- private void stateStaledErrorHandle(RowData in, Collector out) {
+ private void stateStaledErrorHandle() {
if (lenient) {
LOG.warn(STATE_CLEARED_WARN_MSG);
- if (lookupJoinRunner.isLeftOuterJoin) {
- lookupJoinRunner.padNullForLeftJoin(in, out);
- }
} else {
throw new RuntimeException(STATE_CLEARED_WARN_MSG);
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
index 19505da9b07..229e9903ece 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
@@ -74,8 +74,6 @@ public class KeyedLookupJoinHarnessTest {
DataTypes.STRING().getLogicalType()
});
- StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(10_000_000);
-
@Test
public void testTemporalInnerJoin() throws Exception {
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
@@ -352,12 +350,85 @@ public class KeyedLookupJoinHarnessTest {
testHarness.close();
}
- //
---------------------------------------------------------------------------------
+ @Test
+ public void testTemporalLeftJoinWithTtlLookupKeyContainsPk() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER,
true, 1_000);
+
+ testHarness.open();
+ // set TtlTimeProvider with 1
+ testHarness.setStateTtlProcessingTime(1);
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+
+ // set TtlTimeProvider with 1001 to trigger expired state cleanup
+ testHarness.setStateTtlProcessingTime(1002);
+ // should output a delete message (pad null) since it's left join
+ testHarness.processElement(deleteRecord(2, "b"));
+
+ testHarness.processElement(insertRecord(2, "b2"));
+ testHarness.processElement(updateBeforeRecord(3, "c"));
+ testHarness.processElement(updateAfterRecord(3, "c2"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(3, "c", null, null));
+ expectedOutput.add(deleteRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(deleteRecord(3, "c", null, null));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testTemporalInnerJoinWithTtlLookupKeyContainsPk() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER,
true, 1_000);
+
+ testHarness.open();
+ // set TtlTimeProvider with 1
+ testHarness.setStateTtlProcessingTime(1);
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
- @SuppressWarnings("unchecked")
+ // set TtlTimeProvider with 1001 to trigger expired state cleanup
+ testHarness.setStateTtlProcessingTime(1002);
+ // should not output a delete message (pad null) since it's inner join
+ testHarness.processElement(deleteRecord(2, "b"));
+
+ testHarness.processElement(insertRecord(2, "b2"));
+ testHarness.processElement(updateBeforeRecord(3, "c"));
+ testHarness.processElement(updateAfterRecord(3, "c2"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ //
---------------------------------------------------------------------------------
private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
createHarness(
JoinType joinType, FilterOnTable filterOnTable, boolean
lookupKeyContainsPrimaryKey)
throws Exception {
+ return createHarness(joinType, filterOnTable,
lookupKeyContainsPrimaryKey, -1);
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
createHarness(
+ JoinType joinType,
+ FilterOnTable filterOnTable,
+ boolean lookupKeyContainsPrimaryKey,
+ long stateTtl)
+ throws Exception {
+ StateTtlConfig ttlConfig =
+ StateConfigUtil.createTtlConfig(stateTtl < 1 ? 1_000_000 :
stateTtl);
boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
LookupJoinRunner joinRunner;
TestingEvolvingOutputFetcherFunction fetcher;
@@ -372,7 +443,7 @@ public class KeyedLookupJoinHarnessTest {
new GeneratedFunctionWrapper<>(fetcher),
new GeneratedCollectorWrapper<>(
new
LookupJoinHarnessTest.TestingFetcherCollector()),
- new GeneratedFunctionWrapper(
+ new GeneratedFunctionWrapper<>(
new
LookupJoinHarnessTest.TestingPreFilterCondition()),
isLeftJoin,
2);
@@ -384,7 +455,7 @@ public class KeyedLookupJoinHarnessTest {
new
LookupJoinHarnessTest.CalculateOnTemporalTable()),
new GeneratedCollectorWrapper<>(
new
LookupJoinHarnessTest.TestingFetcherCollector()),
- new GeneratedFunctionWrapper(
+ new GeneratedFunctionWrapper<>(
new
LookupJoinHarnessTest.TestingPreFilterCondition()),
isLeftJoin,
2);