stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555444419
##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
+ public static final String WRITE_SHUFFLE_BY_PARTITION =
"write.shuffle-by.partition";
Review comment:
+1 on more generalized semantics. Current option only works if the data
is relatively evenly distrubuted across table partitions. Otherwise, heavy data
skew can be problematic for writer. The other problem is that effective writer
parallelism now is limited by the number of partition values. Let's say the
writer parallelism is 100, but the number of unique partition values are only
10. Then only 10 writer subtasks will get the data.
I will add some notes for the streaming write mode. In a streaming job, it
is probably impossible to do true sorting. Instead, what can be useful is some
sort of "groupBy/bucketing" shuffle in the streaming sink. It can help with
reducing too many concurrent open files per writer and improving read
performance (predicate pushdown) with better data locality.
E.g., a table is partition by (event_date, country). Without the shuffle,
each writer task can write to ~200 files/countries. However, a simple keyBy is
also problematic as it can produce heavy data skew for countries like US.
Instead, we should calculate stats for each bucket/country and distribute the
data based on the weight of each bucket. E.g., we may allocate 100 downstream
subtasks for US, while allocating 1 downstream subtask for multiple small
countries (like bin packing).
This can also be extended to non-partition column (as logical partitioning),
which can improve read performance with filtering. Similar to the above example
with the tweak that country is not a partition column anymore.
groupBy/bucketing shuffle can help improve data locality.
I was thinking about a groupBy operator where each subtask (running in
taskmanager) can constantly report local statistics to operator coordinator
(running in jobmanager), which then does the global aggregation and notify
subtasks with the globally aggregated stats.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]