stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555444419



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = 
"write.shuffle-by.partition";

Review comment:
       +1 on more generalized semantics. Current option only works if the data 
is relatively evenly distrubuted across table partitions. Otherwise, heavy data 
skew can be problematic for writer. The other problem is that effective writer 
parallelism now is limited by the number of partition values. Let's say the 
writer parallelism is 100, but the number of unique partition values are only 
10. Then only 10 writer subtasks will get the data.
   
   I will add some notes for the streaming write mode. In a streaming job, it 
is probably impossible to do true sorting. Instead, what can be useful is some 
sort of "groupBy/bucketing" shuffle in the streaming sink. It can help with 
reducing too many concurrent open files per writer and improving read 
performance (predicate pushdown) with better data locality.
   
   E.g., a table is partition by (event_date, country). Without the shuffle, 
each writer task can write to ~200 files/countries. However, a simple keyBy is 
also problematic as it can produce heavy data skew for countries like US. 
Instead, we should calculate stats for each bucket/country and distribute the 
data based on the weight of each bucket. E.g., we may allocate 100 downstream 
subtasks for US, while allocating 1 downstream subtask for multiple small 
countries (like bin packing).
   
   This can also be extended to non-partition column (as logical partitioning), 
which can improve read performance with filtering. Similar to the above example 
with the tweak that country is not a partition column anymore. 
groupBy/bucketing shuffle can help improve data locality. 
   
   I was thinking about a groupBy/orderBy operator where each subtask (running 
in taskmanager) can constantly report local statistics to operator coordinator 
(running in jobmanager), which then does the global aggregation and notify 
subtasks with the globally aggregated stats.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to