This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 4e795b5f01a [FLINK-34166][table] Fix KeyedLookupJoinWrapper
incorrectly process delete message for inner join when previous lookup result
is empty
4e795b5f01a is described below
commit 4e795b5f01a42659367a55d9913e60b599247a92
Author: lincoln lee <[email protected]>
AuthorDate: Tue Jan 23 19:41:04 2024 +0800
[FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete
message for inner join when previous lookup result is empty
This closes #24167
---
.../join/lookup/KeyedLookupJoinWrapper.java | 29 ++++---
.../operators/join/lookup/LookupJoinRunner.java | 2 +-
.../operators/join/KeyedLookupJoinHarnessTest.java | 92 ++++++++++++++++++++--
3 files changed, 105 insertions(+), 18 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
index 17eb2c8a319..3b0e2318691 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
@@ -134,34 +134,46 @@ public class KeyedLookupJoinWrapper extends
KeyedProcessFunction<RowData, RowDat
// fetcher has copied the input field when object reuse is enabled
lookupJoinRunner.doFetch(in);
- // update state will empty row if lookup miss
+ // update state with empty row if join condition unsatisfied
if (!collectListener.collected) {
updateState(emptyRow);
}
lookupJoinRunner.padNullForLeftJoin(in, out);
} else {
+ boolean collected = false;
// do state access for non-acc msg
if (lookupKeyContainsPrimaryKey) {
RowData rightRow = uniqueState.value();
- // should distinguish null from empty(lookup miss)
+ // should distinguish null from empty(join condition
unsatisfied)
if (null == rightRow) {
- stateStaledErrorHandle(in, out);
- } else {
+ stateStaledErrorHandle();
+ } else if (!emptyRow.equals(rightRow)) {
collectDeleteRow(in, rightRow, out);
+ collected = true;
}
} else {
List<RowData> rightRows = state.value();
if (null == rightRows) {
- stateStaledErrorHandle(in, out);
+ stateStaledErrorHandle();
} else {
for (RowData row : rightRows) {
- collectDeleteRow(in, row, out);
+ if (!emptyRow.equals(row)) {
+ collectDeleteRow(in, row, out);
+ collected = true;
+ }
}
}
}
// clear state at last
deleteState();
+
+ // pad null for left join if no delete row collected from state,
here we can't use the
+ // collector's status to determine whether the row is collected or
not, because data
+ // fetched from state is not collected by the collector
+ if (lookupJoinRunner.isLeftOuterJoin && !collected) {
+ collectDeleteRow(in, lookupJoinRunner.nullRow, out);
+ }
}
}
@@ -222,12 +234,9 @@ public class KeyedLookupJoinWrapper extends
KeyedProcessFunction<RowData, RowDat
}
}
- private void stateStaledErrorHandle(RowData in, Collector out) {
+ private void stateStaledErrorHandle() {
if (lenient) {
LOG.warn(STATE_CLEARED_WARN_MSG);
- if (lookupJoinRunner.isLeftOuterJoin) {
- lookupJoinRunner.padNullForLeftJoin(in, out);
- }
} else {
throw new RuntimeException(STATE_CLEARED_WARN_MSG);
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
index f0d99384c45..8d79392d3c5 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
@@ -42,7 +42,7 @@ public class LookupJoinRunner extends
ProcessFunction<RowData, RowData> {
private transient FlatMapFunction<RowData, RowData> fetcher;
protected transient ListenableCollector<RowData> collector;
protected transient JoinedRowData outRow;
- private transient GenericRowData nullRow;
+ protected transient GenericRowData nullRow;
public LookupJoinRunner(
GeneratedFunction<FlatMapFunction<RowData, RowData>>
generatedFetcher,
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
index a4ed9af4b44..daaec0c55d3 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
@@ -74,8 +74,6 @@ public class KeyedLookupJoinHarnessTest {
DataTypes.STRING().getLogicalType()
});
- StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(10_000_000);
-
@Test
public void testTemporalInnerJoin() throws Exception {
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
@@ -200,7 +198,6 @@ public class KeyedLookupJoinHarnessTest {
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
- expectedOutput.add(deleteRecord(3, "c", null, null));
expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
@@ -288,6 +285,8 @@ public class KeyedLookupJoinHarnessTest {
testHarness.processElement(updateAfterRecord(3, "c2"));
testHarness.processElement(deleteRecord(3, "c2"));
testHarness.processElement(insertRecord(3, "c3"));
+ testHarness.processElement(deleteRecord(4, "d"));
+ testHarness.processElement(insertRecord(4, null));
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
@@ -304,6 +303,8 @@ public class KeyedLookupJoinHarnessTest {
expectedOutput.add(deleteRecord(3, "c2", 6, "Jackson-2"));
expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
expectedOutput.add(insertRecord(3, "c3", 9, "Jackson-3"));
+ expectedOutput.add(deleteRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(insertRecord(4, null, 8, "Fabian-2"));
assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
testHarness.close();
@@ -327,6 +328,8 @@ public class KeyedLookupJoinHarnessTest {
testHarness.processElement(updateAfterRecord(3, "c2"));
testHarness.processElement(deleteRecord(3, "c2"));
testHarness.processElement(insertRecord(3, "c3"));
+ testHarness.processElement(deleteRecord(4, "d"));
+ testHarness.processElement(insertRecord(4, null));
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
@@ -340,17 +343,92 @@ public class KeyedLookupJoinHarnessTest {
expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+ expectedOutput.add(deleteRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(insertRecord(4, null, 8, "Fabian-2"));
assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
testHarness.close();
}
- //
---------------------------------------------------------------------------------
+ @Test
+ public void testTemporalLeftJoinWithTtlLookupKeyContainsPk() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER,
true, 1_000);
+
+ testHarness.open();
+ // set TtlTimeProvider with 1
+ testHarness.setStateTtlProcessingTime(1);
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+
+ // set TtlTimeProvider with 1001 to trigger expired state cleanup
+ testHarness.setStateTtlProcessingTime(1002);
+ // should output a delete message (pad null) since it's left join
+ testHarness.processElement(deleteRecord(2, "b"));
+
+ testHarness.processElement(insertRecord(2, "b2"));
+ testHarness.processElement(updateBeforeRecord(3, "c"));
+ testHarness.processElement(updateAfterRecord(3, "c2"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(3, "c", null, null));
+ expectedOutput.add(deleteRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(deleteRecord(3, "c", null, null));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
- @SuppressWarnings("unchecked")
+ @Test
+ public void testTemporalInnerJoinWithTtlLookupKeyContainsPk() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER,
true, 1_000);
+
+ testHarness.open();
+ // set TtlTimeProvider with 1
+ testHarness.setStateTtlProcessingTime(1);
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+
+ // set TtlTimeProvider with 1001 to trigger expired state cleanup
+ testHarness.setStateTtlProcessingTime(1002);
+ // should not output a delete message (pad null) since it's inner join
+ testHarness.processElement(deleteRecord(2, "b"));
+
+ testHarness.processElement(insertRecord(2, "b2"));
+ testHarness.processElement(updateBeforeRecord(3, "c"));
+ testHarness.processElement(updateAfterRecord(3, "c2"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ //
---------------------------------------------------------------------------------
private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
createHarness(
JoinType joinType, FilterOnTable filterOnTable, boolean
lookupKeyContainsPrimaryKey)
throws Exception {
+ return createHarness(joinType, filterOnTable,
lookupKeyContainsPrimaryKey, -1);
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
createHarness(
+ JoinType joinType,
+ FilterOnTable filterOnTable,
+ boolean lookupKeyContainsPrimaryKey,
+ long stateTtl)
+ throws Exception {
+ StateTtlConfig ttlConfig =
+ StateConfigUtil.createTtlConfig(stateTtl < 1 ? 1_000_000 :
stateTtl);
boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
LookupJoinRunner joinRunner;
TestingEvolvingOutputFetcherFunction fetcher;
@@ -486,8 +564,8 @@ public class KeyedLookupJoinHarnessTest {
int currentCnt = counter(id);
List<GenericRowData> rows = lookup(id);
if (rows != null) {
- for (int i = 0; i < rows.size(); i++) {
- collectUpdatedRow(rows.get(i), currentCnt, out);
+ for (GenericRowData row : rows) {
+ collectUpdatedRow(row, currentCnt, out);
}
} else if (currentCnt > 1) {
// return a default value for which lookup miss at 1st time