rdblue commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r810411948



##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -412,18 +417,38 @@ private String operatorName(String suffix) {
 
       switch (writeMode) {
         case NONE:
+          if (!equalityFieldIds.isEmpty()) {
+            LOG.info("Distribute rows by equality fields in '{}' distribution 
mode", DistributionMode.NONE.modeName());
+            return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds, 
iSchema, flinkRowType));
+          }
+
           return input;
 
         case HASH:
           if (partitionSpec.isUnpartitioned()) {
+            if (!equalityFieldIds.isEmpty()) {
+              LOG.info("Distribute rows by equality fields in '{}' 
distribution mode, because table is unpartitioned",
+                  DistributionMode.HASH.modeName());
+              return input.keyBy(new 
EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType));
+            }
+
+            LOG.warn("Fallback to use '{}' distribution mode, because table is 
unpartitioned",
+                DistributionMode.NONE.modeName());
             return input;
           } else {
+            LOG.info("Distribute rows by partition fields in '{}' distribution 
mode", DistributionMode.HASH.modeName());

Review comment:
       @Reo-LEI, I don't think that's accurate. The requirement is to detect 
whether the equality distribution is satisfied by the hash distribution. If it 
is, then you can use the hash distribution. If not, then fail.
   
   I think that check should be whether there are any partition fields that use 
a source column that is not part of the identifier fields. For example, a table 
with `type`, `id`, and `data` and a simple key `id` requires any partition 
transform that uses `id`, like `bucket(256, id)` or `truncate(id, 10)`. But if 
there is an additional field like `data` then we can't guarantee that all `id` 
values go to the same data partition.
   
   Another case that does work is when the table identifier is made from both 
`type` and `id`. In that case, partitioning by either `type` or `id` 
individually works. Same with partitioning by `type` and `id`. But again, 
partitioning by data ruins the distribution.
   
   So the logic should be: make sure that the partition spec includes at least 
one identifier field column and that all of the partition source fields are 
identifier fields.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to