rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = 
"write.shuffle-by.partition";

Review comment:
       I think we're all in agreement to consider behavior across engines and 
consider settings that can be used anywhere. One of the major differences 
between batch and streaming is that it would be expensive and unusual to sort 
the records processed by a task, so actually producing records sorted locally 
within a task isn't something we should expect. My guess is that Flink won't 
support either local or global sort -- global is just task-level sort with a 
shared range partitioner.
   
   We do have a need to shuffle data to the write tasks in Flink. Shuffling by 
partitions is a good idea to start with because it covers the cases where hash 
partitioning is balanced. Skew in the partitions that @stevenzwu notes is a 
concern, but I'm also interested in handling skew in non-partition columns. If 
downstream reads are likely to filter by `country` in Steven's example, then 
clustering data by country in Flink is a good idea even if it isn't a partition 
column. Spark uses global ordering to handle this skew, which will estimate 
ranges of the sort keys and produce a partitioner config that balances the data.
   
   For Flink, what makes sense to me is to have options for the key-by 
operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` -- 
where `range(sort key)` is determined using a method like Steven suggests. 
These distribution options correspond to the ones we plan to support in Spark, 
just without the task-level sort.
   
   I propose using a table property, `write.distribution-mode`, in both engines 
with values:
   * `none`: do not shuffle rows
   * `partition`: shuffle rows by `hash(partition)`
   * `sort`: shuffle rows by `range(sort key)` using a ranges provided by a 
skew calculation or from table metrics
   
   I considered a similar mode for sort, but Flink will probably ignore it and 
in Spark we are making it implicit: if the table's sort order is defined, then 
request a sort. So there is no need for an equivalent sort mode property.
   
   If we go with this, then we have one setting that works in both Flink and 
Spark. In Flink the property controls the `keyBy` behavior only, and in Spark 
it is combined with sort order to give the options that we've discussed:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution, locally sort by partition key | hash 
distribute by partition key, locally sort by partition key | range distribute 
by partition key, locally sort by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition 
key, locally sorted | globally sorted |
   
   I think Flink would use a different sort key depending on whether the table 
order is set: it would range distribute by partition key or sort key. Here's an 
equivalent table:
   
   | Flink | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key 
| range distribute by partition key |
   | ordered | no distribution or ordering | hash distribute by partition key | 
range distribute by sort key |
   
   In the short term, I expect Flink to only support `none` and `partition` 
modes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to