rdblue commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r810411948
##########
File path:
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -412,18 +417,38 @@ private String operatorName(String suffix) {
switch (writeMode) {
case NONE:
+ if (!equalityFieldIds.isEmpty()) {
+ LOG.info("Distribute rows by equality fields in '{}' distribution
mode", DistributionMode.NONE.modeName());
+ return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds,
iSchema, flinkRowType));
+ }
+
return input;
case HASH:
if (partitionSpec.isUnpartitioned()) {
+ if (!equalityFieldIds.isEmpty()) {
+ LOG.info("Distribute rows by equality fields in '{}'
distribution mode, because table is unpartitioned",
+ DistributionMode.HASH.modeName());
+ return input.keyBy(new
EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType));
+ }
+
+ LOG.warn("Fallback to use '{}' distribution mode, because table is
unpartitioned",
+ DistributionMode.NONE.modeName());
return input;
} else {
+ LOG.info("Distribute rows by partition fields in '{}' distribution
mode", DistributionMode.HASH.modeName());
Review comment:
@Reo-LEI, I don't think that's accurate. The requirement is to detect
whether the equality distribution is satisfied by the hash distribution. If it
is, then you can use the hash distribution. If not, then fail.
I think that check should be whether there are any partition fields that use
a source column that is not part of the identifier fields. For example, a table
with `type`, `id`, and `data` and a simple key `id` requires any partition
transform that uses `id`, like `bucket(256, id)` or `truncate(id, 10)`. But if
there is an additional field like `data` then we can't guarantee that all `id`
values go to the same data partition.
Another case that does work is when the table identifier is made from both
`type` and `id`. In that case, partitioning by either `type` or `id`
individually works. Same with partitioning by `type` and `id`. But again,
partitioning by data ruins the distribution.
So the logic should be: make sure that the partition spec includes at least
one identifier field column and that all of the partition source fields are
identifier fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]