rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257
##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
+ public static final String WRITE_SHUFFLE_BY_PARTITION =
"write.shuffle-by.partition";
Review comment:
I think we're all in agreement to consider behavior across engines and
consider settings that can be used anywhere. One of the major differences
between batch and streaming is that it would be expensive and unusual to sort
the records processed by a task, so actually producing records sorted locally
within a task isn't something we should expect. My guess is that Flink won't
support either local or global sort -- global is just task-level sort with a
shared range partitioner.
We do have a need to shuffle data to the write tasks in Flink. Shuffling by
partitions is a good idea to start with because it covers the cases where hash
partitioning is balanced. Skew in the partitions that @stevenzwu notes is a
concern, but I'm also interested in handling skew in non-partition columns. If
downstream reads are likely to filter by `country` in Steven's example, then
clustering data by country in Flink is a good idea even if it isn't a partition
column. Spark uses global ordering to handle this skew, which will estimate
ranges of the sort keys and produce a partitioner config that balances the data.
For Flink, what makes sense to me is to have options for the key-by
operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` --
where `range(sort key)` is determined using a method like Steven suggests.
These distribution options correspond to the ones we plan to support in Spark,
just without the task-level sort.
I propose using a table property, `write.distribution-mode`, in both engines
with values:
* `none`: do not shuffle rows
* `partition`: shuffle rows by `hash(partition)`
* `sort`: shuffle rows by `range(sort key)` using a ranges provided by a
skew calculation or from table metrics
I considered a similar mode for sort, but Flink will probably ignore it and
in Spark we are making it implicit: if the table's sort order is defined, then
request a sort. So there is no need for an equivalent sort mode property.
If we go with this, then we have one setting that works in both Flink and
Spark. In Flink the property controls the `keyBy` behavior only, and in Spark
it is combined with sort order to give the options that we've discussed:
| Spark | `none` | `partition` | `sort` |
|-|--------|-------------|--------|
| unordered | no distribution or ordering | hash distribute by partition key
| range distribute by partition key |
| ordered | no distribution, locally sorted | hash distribute by partition
key, locally sorted | globally sorted |
I think Flink would use a different sort key depending on whether the table
order is set: it would range distribute by partition key or sort key. Here's an
equivalent table:
| Flink | `none` | `partition` | `sort` |
|-|--------|-------------|--------|
| unordered | no distribution or ordering | hash distribute by partition key
| range distribute by partition key |
| ordered | no distribution or ordering | hash distribute by partition key |
range distribute by sort key |
In the short term, I expect Flink to only support `none` and `partition`
modes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]