This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 17e7c3eaf14 [FLINK-35184][table-runtime] Fix mini-batch join hash collision when use InputSideHasNoUniqueKeyBundle (#24749) 17e7c3eaf14 is described below commit 17e7c3eaf14b6c63f55d28a308e30ad6a3a80c95 Author: Roman Boyko <ro.v.bo...@gmail.com> AuthorDate: Fri May 10 10:57:45 2024 +0700 [FLINK-35184][table-runtime] Fix mini-batch join hash collision when use InputSideHasNoUniqueKeyBundle (#24749) --- .../bundle/InputSideHasNoUniqueKeyBundle.java | 25 ++++-- .../join/stream/StreamingJoinOperatorTestBase.java | 4 +- .../stream/StreamingMiniBatchJoinOperatorTest.java | 95 +++++++++++++++++----- 3 files changed, 93 insertions(+), 31 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java index b5738835b95..fdc9e1d5193 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java @@ -96,15 +96,26 @@ public class InputSideHasNoUniqueKeyBundle extends BufferBundle<Map<Integer, Lis RowData rec = iterator.previous(); if ((RowDataUtil.isAccumulateMsg(record) && RowDataUtil.isRetractMsg(rec)) || (RowDataUtil.isRetractMsg(record) && RowDataUtil.isAccumulateMsg(rec))) { - iterator.remove(); - actualSize--; - if (list.isEmpty()) { - bundle.get(joinKey).remove(hashKey); - if (bundle.get(joinKey).isEmpty()) { - bundle.remove(joinKey); + // here it's necessary to additionally check that record == rec because hashKey of + // these two records might collide. For this purpose here RowKind is set to +I and + // after all it is returned to original value. + RowKind recRowKind = rec.getRowKind(); + RowKind recordRowKind = record.getRowKind(); + rec.setRowKind(RowKind.INSERT); + record.setRowKind(RowKind.INSERT); + if (record.equals(rec)) { + iterator.remove(); + actualSize--; + if (list.isEmpty()) { + bundle.get(joinKey).remove(hashKey); + if (bundle.get(joinKey).isEmpty()) { + bundle.remove(joinKey); + } } + return true; } - return true; + rec.setRowKind(recRowKind); + record.setRowKind(recordRowKind); } } return false; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java index 68afbe72999..392f83cf8cf 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java @@ -41,7 +41,7 @@ import java.util.function.Function; /** Base test class for {@link AbstractStreamingJoinOperator}. */ public abstract class StreamingJoinOperatorTestBase { - protected final InternalTypeInfo<RowData> leftTypeInfo = + protected InternalTypeInfo<RowData> leftTypeInfo = InternalTypeInfo.of( RowType.of( new LogicalType[] { @@ -57,7 +57,7 @@ public abstract class StreamingJoinOperatorTestBase { new LogicalType[] {new CharType(false, 20), new CharType(true, 10)}, new String[] {"line_order_id0", "line_order_ship_mode"})); - protected final RowDataKeySelector leftKeySelector = + protected RowDataKeySelector leftKeySelector = HandwrittenSelectorUtil.getRowDataSelector( new int[] {1}, leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java index 62b8116a0b0..7e92f72cf5e 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java @@ -25,13 +25,13 @@ import org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrig import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.HandwrittenSelectorUtil; import org.apache.flink.types.RowKind; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -55,27 +55,6 @@ public final class StreamingMiniBatchJoinOperatorTest extends StreamingJoinOpera private RowDataKeySelector leftUniqueKeySelector; private RowDataKeySelector rightUniqueKeySelector; - @BeforeEach - public void beforeEach(TestInfo testInfo) throws Exception { - rightTypeInfo = - InternalTypeInfo.of( - RowType.of( - new LogicalType[] { - new CharType(false, 20), - new CharType(false, 20), - new CharType(true, 10) - }, - new String[] { - "order_id#", "line_order_id0", "line_order_ship_mode" - })); - - rightKeySelector = - HandwrittenSelectorUtil.getRowDataSelector( - new int[] {1}, - rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); - super.beforeEach(testInfo); - } - @Tag("miniBatchSize=3") @Test public void testLeftJoinWithLeftArriveFirst() throws Exception { @@ -849,6 +828,17 @@ public final class StreamingMiniBatchJoinOperatorTest extends StreamingJoinOpera "4 Bellevue Drive, Pottstown, PB 19464")); } + @Tag("miniBatchSize=4") + @Test + public void testInnerJoinWithNoUniqueKeyHashCollisionSimpleSchema() throws Exception { + testHarness.processElement2(insertRecord("1", 1L)); + testHarness.processElement1(insertRecord("1", 4294967296L)); + testHarness.processElement2(insertRecord("1", 4294967296L)); + testHarness.processElement2(deleteRecord("1", 1L)); + assertor.shouldEmit( + testHarness, rowOfKind(RowKind.INSERT, "1", 4294967296L, "1", 4294967296L)); + } + @Tag("miniBatchSize=6") @Test public void testInnerJoinWithNoUniqueKeyCrossBatches() throws Exception { @@ -1828,6 +1818,10 @@ public final class StreamingMiniBatchJoinOperatorTest extends StreamingJoinOpera @Override public MiniBatchStreamingJoinOperator createJoinOperator(TestInfo testInfo) { + leftTypeInfo = leftTypeInfoExtractor.apply(testInfo.getDisplayName()); + rightTypeInfo = rightTypeInfoExtractor.apply(testInfo.getDisplayName()); + leftKeySelector = leftKeySelectorExtractor.apply(testInfo.getDisplayName()); + rightKeySelector = rightKeySelectorExtractor.apply(testInfo.getDisplayName()); RowDataKeySelector[] keySelectors = ukSelectorExtractor.apply(testInfo.getDisplayName()); leftUniqueKeySelector = keySelectors[0]; rightUniqueKeySelector = keySelectors[1]; @@ -1953,4 +1947,61 @@ public final class StreamingMiniBatchJoinOperatorTest extends StreamingJoinOpera return FlinkJoinType.FULL; } }; + + private final Function<String, InternalTypeInfo<RowData>> leftTypeInfoExtractor = + (testDisplayName) -> { + if (testDisplayName.contains("SimpleSchema")) { + return InternalTypeInfo.of( + RowType.of( + new LogicalType[] {new CharType(false, 1), new BigIntType()}, + new String[] {"id1", "val1"})); + } else { + return leftTypeInfo; + } + }; + + private final Function<String, InternalTypeInfo<RowData>> rightTypeInfoExtractor = + (testDisplayName) -> { + if (testDisplayName.contains("SimpleSchema")) { + return InternalTypeInfo.of( + RowType.of( + new LogicalType[] {new CharType(false, 1), new BigIntType()}, + new String[] {"id2", "val2"})); + } else { + return InternalTypeInfo.of( + RowType.of( + new LogicalType[] { + new CharType(false, 20), + new CharType(false, 20), + new CharType(true, 10) + }, + new String[] { + "order_id#", "line_order_id0", "line_order_ship_mode" + })); + } + }; + + private final Function<String, RowDataKeySelector> leftKeySelectorExtractor = + (testDisplayName) -> { + if (testDisplayName.contains("SimpleSchema")) { + return HandwrittenSelectorUtil.getRowDataSelector( + new int[] {0}, + leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); + } else { + return leftKeySelector; + } + }; + + private final Function<String, RowDataKeySelector> rightKeySelectorExtractor = + (testDisplayName) -> { + if (testDisplayName.contains("SimpleSchema")) { + return HandwrittenSelectorUtil.getRowDataSelector( + new int[] {0}, + rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); + } else { + return HandwrittenSelectorUtil.getRowDataSelector( + new int[] {1}, + rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); + } + }; }