This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f543cc543e9 [FLINK-35184][table-runtime] Fix mini-batch join hash 
collision when use InputSideHasNoUniqueKeyBundle
f543cc543e9 is described below

commit f543cc543e9b0eb05415095190e86d3b22cdf1a4
Author: Roman Boyko <ro.v.bo...@gmail.com>
AuthorDate: Tue Apr 23 12:13:58 2024 +0700

    [FLINK-35184][table-runtime] Fix mini-batch join hash collision when use 
InputSideHasNoUniqueKeyBundle
    
    This closes #24703
---
 .../bundle/InputSideHasNoUniqueKeyBundle.java      | 25 ++++--
 .../join/stream/StreamingJoinOperatorTestBase.java |  4 +-
 .../stream/StreamingMiniBatchJoinOperatorTest.java | 95 +++++++++++++++++-----
 3 files changed, 93 insertions(+), 31 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java
index b5738835b95..fdc9e1d5193 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java
@@ -96,15 +96,26 @@ public class InputSideHasNoUniqueKeyBundle extends 
BufferBundle<Map<Integer, Lis
             RowData rec = iterator.previous();
             if ((RowDataUtil.isAccumulateMsg(record) && 
RowDataUtil.isRetractMsg(rec))
                     || (RowDataUtil.isRetractMsg(record) && 
RowDataUtil.isAccumulateMsg(rec))) {
-                iterator.remove();
-                actualSize--;
-                if (list.isEmpty()) {
-                    bundle.get(joinKey).remove(hashKey);
-                    if (bundle.get(joinKey).isEmpty()) {
-                        bundle.remove(joinKey);
+                // here it's necessary to additionally check that record == 
rec because hashKey of
+                // these two records might collide. For this purpose here 
RowKind is set to +I and
+                // after all it is returned to original value.
+                RowKind recRowKind = rec.getRowKind();
+                RowKind recordRowKind = record.getRowKind();
+                rec.setRowKind(RowKind.INSERT);
+                record.setRowKind(RowKind.INSERT);
+                if (record.equals(rec)) {
+                    iterator.remove();
+                    actualSize--;
+                    if (list.isEmpty()) {
+                        bundle.get(joinKey).remove(hashKey);
+                        if (bundle.get(joinKey).isEmpty()) {
+                            bundle.remove(joinKey);
+                        }
                     }
+                    return true;
                 }
-                return true;
+                rec.setRowKind(recRowKind);
+                record.setRowKind(recordRowKind);
             }
         }
         return false;
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
index 68afbe72999..392f83cf8cf 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
@@ -41,7 +41,7 @@ import java.util.function.Function;
 /** Base test class for {@link AbstractStreamingJoinOperator}. */
 public abstract class StreamingJoinOperatorTestBase {
 
-    protected final InternalTypeInfo<RowData> leftTypeInfo =
+    protected InternalTypeInfo<RowData> leftTypeInfo =
             InternalTypeInfo.of(
                     RowType.of(
                             new LogicalType[] {
@@ -57,7 +57,7 @@ public abstract class StreamingJoinOperatorTestBase {
                             new LogicalType[] {new CharType(false, 20), new 
CharType(true, 10)},
                             new String[] {"line_order_id0", 
"line_order_ship_mode"}));
 
-    protected final RowDataKeySelector leftKeySelector =
+    protected RowDataKeySelector leftKeySelector =
             HandwrittenSelectorUtil.getRowDataSelector(
                     new int[] {1},
                     leftTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
index 62b8116a0b0..7e92f72cf5e 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
@@ -25,13 +25,13 @@ import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrig
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.HandwrittenSelectorUtil;
 import org.apache.flink.types.RowKind;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -55,27 +55,6 @@ public final class StreamingMiniBatchJoinOperatorTest 
extends StreamingJoinOpera
     private RowDataKeySelector leftUniqueKeySelector;
     private RowDataKeySelector rightUniqueKeySelector;
 
-    @BeforeEach
-    public void beforeEach(TestInfo testInfo) throws Exception {
-        rightTypeInfo =
-                InternalTypeInfo.of(
-                        RowType.of(
-                                new LogicalType[] {
-                                    new CharType(false, 20),
-                                    new CharType(false, 20),
-                                    new CharType(true, 10)
-                                },
-                                new String[] {
-                                    "order_id#", "line_order_id0", 
"line_order_ship_mode"
-                                }));
-
-        rightKeySelector =
-                HandwrittenSelectorUtil.getRowDataSelector(
-                        new int[] {1},
-                        rightTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
-        super.beforeEach(testInfo);
-    }
-
     @Tag("miniBatchSize=3")
     @Test
     public void testLeftJoinWithLeftArriveFirst() throws Exception {
@@ -849,6 +828,17 @@ public final class StreamingMiniBatchJoinOperatorTest 
extends StreamingJoinOpera
                         "4 Bellevue Drive, Pottstown, PB 19464"));
     }
 
+    @Tag("miniBatchSize=4")
+    @Test
+    public void testInnerJoinWithNoUniqueKeyHashCollisionSimpleSchema() throws 
Exception {
+        testHarness.processElement2(insertRecord("1", 1L));
+        testHarness.processElement1(insertRecord("1", 4294967296L));
+        testHarness.processElement2(insertRecord("1", 4294967296L));
+        testHarness.processElement2(deleteRecord("1", 1L));
+        assertor.shouldEmit(
+                testHarness, rowOfKind(RowKind.INSERT, "1", 4294967296L, "1", 
4294967296L));
+    }
+
     @Tag("miniBatchSize=6")
     @Test
     public void testInnerJoinWithNoUniqueKeyCrossBatches() throws Exception {
@@ -1828,6 +1818,10 @@ public final class StreamingMiniBatchJoinOperatorTest 
extends StreamingJoinOpera
 
     @Override
     public MiniBatchStreamingJoinOperator createJoinOperator(TestInfo 
testInfo) {
+        leftTypeInfo = leftTypeInfoExtractor.apply(testInfo.getDisplayName());
+        rightTypeInfo = 
rightTypeInfoExtractor.apply(testInfo.getDisplayName());
+        leftKeySelector = 
leftKeySelectorExtractor.apply(testInfo.getDisplayName());
+        rightKeySelector = 
rightKeySelectorExtractor.apply(testInfo.getDisplayName());
         RowDataKeySelector[] keySelectors = 
ukSelectorExtractor.apply(testInfo.getDisplayName());
         leftUniqueKeySelector = keySelectors[0];
         rightUniqueKeySelector = keySelectors[1];
@@ -1953,4 +1947,61 @@ public final class StreamingMiniBatchJoinOperatorTest 
extends StreamingJoinOpera
                     return FlinkJoinType.FULL;
                 }
             };
+
+    private final Function<String, InternalTypeInfo<RowData>> 
leftTypeInfoExtractor =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("SimpleSchema")) {
+                    return InternalTypeInfo.of(
+                            RowType.of(
+                                    new LogicalType[] {new CharType(false, 1), 
new BigIntType()},
+                                    new String[] {"id1", "val1"}));
+                } else {
+                    return leftTypeInfo;
+                }
+            };
+
+    private final Function<String, InternalTypeInfo<RowData>> 
rightTypeInfoExtractor =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("SimpleSchema")) {
+                    return InternalTypeInfo.of(
+                            RowType.of(
+                                    new LogicalType[] {new CharType(false, 1), 
new BigIntType()},
+                                    new String[] {"id2", "val2"}));
+                } else {
+                    return InternalTypeInfo.of(
+                            RowType.of(
+                                    new LogicalType[] {
+                                        new CharType(false, 20),
+                                        new CharType(false, 20),
+                                        new CharType(true, 10)
+                                    },
+                                    new String[] {
+                                        "order_id#", "line_order_id0", 
"line_order_ship_mode"
+                                    }));
+                }
+            };
+
+    private final Function<String, RowDataKeySelector> 
leftKeySelectorExtractor =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("SimpleSchema")) {
+                    return HandwrittenSelectorUtil.getRowDataSelector(
+                            new int[] {0},
+                            leftTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
+                } else {
+                    return leftKeySelector;
+                }
+            };
+
+    private final Function<String, RowDataKeySelector> 
rightKeySelectorExtractor =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("SimpleSchema")) {
+                    return HandwrittenSelectorUtil.getRowDataSelector(
+                            new int[] {0},
+                            
rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
+                } else {
+                    return HandwrittenSelectorUtil.getRowDataSelector(
+                            new int[] {1},
+                            
rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
+                }
+            };
 }

Reply via email to