[ 
https://issues.apache.org/jira/browse/HUDI-472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17005976#comment-17005976
 ] 

Ethan Guo commented on HUDI-472:
--------------------------------

I've put up a WIP PR regarding this.

The original design choice to apply sortBy() to all deduped records inside 
bulkInsertInternal() for bulk_insert is that the records are in order in terms 
of the data partition path + record key across all RDD partitions, and each RDD 
partition has a similar number of records.  This gracefully makes sure that 
while iterating through the records, the data partition path of the next record 
either remains the same as the current one or monotonically increases.  This 
guarantees that we only need one parquet file writer (CopyOnWriteInsertHandler) 
at a time and the memory pressure is very low (don't need to cache many records 
in memory) for meeting the file sizing requirements.  The resulting parquet 
files within each data partition have mutually exclusive record key 
range/index, making it efficient for finding the file with a specific record 
key.

Two nuances here regarding this design choice:

(1) Repartitioning the deduped records based on only the data partition path 
may not work well if the input records are highly skewed in terms of the data 
partition path, e.g., one partition has much more data than another partition, 
thus one RDD partition after repartition may have too much data to process 
compared to other RDD partitions, underutilizing the parallelism.

(2) If no sorting is done for the records within in an RDD partition, there is 
higher memory pressure on the executor as the executor must keep multiple 
parquet file writers (CopyOnWriteInsertHandler) open and keep the records in 
memory for bundling and efficient write (the max memory usage can be min(data 
size of RDD partition, number of possible data partition paths * buffer size)). 
 This does not scale well for TB bulk_insert as the amount of data can easily 
surpass the amount of memory available.

(3) If partition path + record key is not sorted globally, i.e., many RDD 
partitions may have records from overlapping data partition paths, each data 
partition would have many small parquet files written from many RDD partitions, 
with a possible overlapping range of record key (min and max).  The seek of a 
record key would have a performance hit.

These design choices target at very large scale bulk_insert.  For relatively 
small bulk_insert, these choices may incur unnecessary overhead and user may 
not benefit from them.  So the WIP PR addresses this by providing a knob to 
disable sorting and another knob to choose to the sort mode (global sort, local 
sort in RDD partition) so that they are configurable based on specific 
workload.  Again, the performance of bulk_insert depends on both the profile of 
input data and the configs used.

I have another thought that has not been implemented in the PR.  The sortBy() 
in Spark has two general stages: (1) sampling all the records and determining 
the record key range for each RDD partition, then shuffling the data based on 
the ranges, (2) sort within each RDD partition.  One tradeoff we can make here 
is to keep stage 1 and not to do stage 2.  Instead, we keep multiple parquet 
file writers while writing the records.  In this case, the number of writers 
should be bounded as range sorting has been done in stage 1.  This may work for 
intermediate bulk_insert.

 

 

 

> Make sortBy() inside bulkInsertInternal() configurable for bulk_insert
> ----------------------------------------------------------------------
>
>                 Key: HUDI-472
>                 URL: https://issues.apache.org/jira/browse/HUDI-472
>             Project: Apache Hudi (incubating)
>          Issue Type: Improvement
>          Components: Performance
>            Reporter: Ethan Guo
>            Assignee: He ZongPing
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to