rdblue commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r799696710



##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -412,18 +417,38 @@ private String operatorName(String suffix) {
 
       switch (writeMode) {
         case NONE:
+          if (!equalityFieldIds.isEmpty()) {
+            LOG.info("Distribute rows by equality fields in '{}' distribution 
mode", DistributionMode.NONE.modeName());
+            return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds, 
iSchema, flinkRowType));
+          }
+
           return input;
 
         case HASH:
           if (partitionSpec.isUnpartitioned()) {
+            if (!equalityFieldIds.isEmpty()) {
+              LOG.info("Distribute rows by equality fields in '{}' 
distribution mode, because table is unpartitioned",
+                  DistributionMode.HASH.modeName());
+              return input.keyBy(new 
EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType));
+            }
+
+            LOG.warn("Fallback to use '{}' distribution mode, because table is 
unpartitioned",
+                DistributionMode.NONE.modeName());
             return input;
           } else {
+            LOG.info("Distribute rows by partition fields in '{}' distribution 
mode", DistributionMode.HASH.modeName());

Review comment:
       For HASH, we have two cases. First, the hash distribution by partition 
key may satisfy the required equality distribution. For example, distributing 
by bucket(16, id) for the equality field id satisfies the requirement because 
all id values are sent to the same task.
   
   If the partition distribution satisfies the equality distribution, then this 
should use the partition distribution. Otherwise, it should use the equality 
distribution to guarantee correctness. It's up to @stevenzwu and you whether 
you think the latter case should fail and throw an exception ("The required 
distribution, hash by partition key, does not correctly group the upsert key") 
or should warn and use the equality distribuiton.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to