xishuaidelin commented on code in PR #24703:
URL: https://github.com/apache/flink/pull/24703#discussion_r1575973786
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java:
##########
@@ -849,6 +851,46 @@ public void testInnerJoinWithNoUniqueKeyWithinBatch()
throws Exception {
"4 Bellevue Drive, Pottstown, PB 19464"));
}
+ @Tag("miniBatchSize=4")
+ @Test
+ public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo)
throws Exception {
Review Comment:
[FLINK-35184.patch](https://github.com/apache/flink/files/15074895/FLINK-35184.patch)
Could you make some adjustments to this test? Currently, all tests are being
conducted using the same schema. If a new schema has been added, could you
refactor this part? The attachment contains my modifications and welcome to
your feedback.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/bundle/InputSideHasNoUniqueKeyBundle.java:
##########
@@ -96,15 +96,19 @@ private boolean foldRecord(RowData joinKey, int hashKey,
RowData record) {
RowData rec = iterator.previous();
if ((RowDataUtil.isAccumulateMsg(record) &&
RowDataUtil.isRetractMsg(rec))
|| (RowDataUtil.isRetractMsg(record) &&
RowDataUtil.isAccumulateMsg(rec))) {
- iterator.remove();
- actualSize--;
- if (list.isEmpty()) {
- bundle.get(joinKey).remove(hashKey);
- if (bundle.get(joinKey).isEmpty()) {
- bundle.remove(joinKey);
+ // here it's necessary to additionally check that record ==
rec because hashKey of
+ // these two records might collide
+ if (record.equals(rec)) {
Review Comment:
BTW, the record in list has its own RowKind which needs to be ignored here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]