Reo-LEI commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r715845154
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -412,10 +417,16 @@ private String operatorName(String suffix) {
switch (writeMode) {
case NONE:
+ if (!equalityFieldIds.isEmpty()) {
+ return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds,
iSchema, flinkRowType));
+ }
return input;
case HASH:
if (partitionSpec.isUnpartitioned()) {
+ if (!equalityFieldIds.isEmpty()) {
+ return input.keyBy(new
EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType));
+ }
Review comment:
For line 420 to 429 is trying to resolve the duplicate rows problem. If
equality fields are already specified, in default, that will shuffle records by
`equalityFieldIds` for unpartitioned and partitioned table.
The all cases about `Distribution Mode` discussion as follow:
Case | Stremaing Type | Distribution Mode | Primary Key | Partition Key |
Shuffle by
-- | -- | -- | -- | -- | --
1 | retract | NONE | userId | | userId
2 | retract | NONE | userId | day, hour | userId
3 | retract | HASH | userId | | userId
4 | retract | HASH | userId | day, hour | day, hour
5 | upsert | NONE | userId | | userId
6 | upsert | NONE | day, hour, userId | day, hour | day, hour, userId
7 | upsert | HASH | userId | | userId
8 | upsert | HASH | day, hour, userId | day, hour | day, hour
According to the solution of
https://github.com/apache/iceberg/pull/2898#issuecomment-926830884, we will get
the job graph as follow for all cases.
```
ChangelogStream-1 ---forward---> Filter/Map-1 ---+ +--->
IcebergStreamWriter-1 ---+
... ... +-hash-+ ...
+-rebalance---> IcebergFilesCommitter
ChangelogStream-m ---forward---> Filter/Map-m ---+ +--->
IcebergStreamWriter-n ---+
```
Now, we can ensure all cdc/upsert record will send to correct
`IcebergStreamWriter` and no -U message will be ignored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]