[
https://issues.apache.org/jira/browse/HUDI-472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17005976#comment-17005976
]
Ethan Guo commented on HUDI-472:
--------------------------------
I've put up a WIP PR regarding this.
The original design choice to apply sortBy() to all deduped records inside
bulkInsertInternal() for bulk_insert is that the records are in order in terms
of the data partition path + record key across all RDD partitions, and each RDD
partition has a similar number of records. This gracefully makes sure that
while iterating through the records, the data partition path of the next record
either remains the same as the current one or monotonically increases. This
guarantees that we only need one parquet file writer (CopyOnWriteInsertHandler)
at a time and the memory pressure is very low (don't need to cache many records
in memory) for meeting the file sizing requirements. The resulting parquet
files within each data partition have mutually exclusive record key
range/index, making it efficient for finding the file with a specific record
key.
Two nuances here regarding this design choice:
(1) Repartitioning the deduped records based on only the data partition path
may not work well if the input records are highly skewed in terms of the data
partition path, e.g., one partition has much more data than another partition,
thus one RDD partition after repartition may have too much data to process
compared to other RDD partitions, underutilizing the parallelism.
(2) If no sorting is done for the records within in an RDD partition, there is
higher memory pressure on the executor as the executor must keep multiple
parquet file writers (CopyOnWriteInsertHandler) open and keep the records in
memory for bundling and efficient write (the max memory usage can be min(data
size of RDD partition, number of possible data partition paths * buffer size)).
This does not scale well for TB bulk_insert as the amount of data can easily
surpass the amount of memory available.
(3) If partition path + record key is not sorted globally, i.e., many RDD
partitions may have records from overlapping data partition paths, each data
partition would have many small parquet files written from many RDD partitions,
with a possible overlapping range of record key (min and max). The seek of a
record key would have a performance hit.
These design choices target at very large scale bulk_insert. For relatively
small bulk_insert, these choices may incur unnecessary overhead and user may
not benefit from them. So the WIP PR addresses this by providing a knob to
disable sorting and another knob to choose to the sort mode (global sort, local
sort in RDD partition) so that they are configurable based on specific
workload. Again, the performance of bulk_insert depends on both the profile of
input data and the configs used.
I have another thought that has not been implemented in the PR. The sortBy()
in Spark has two general stages: (1) sampling all the records and determining
the record key range for each RDD partition, then shuffling the data based on
the ranges, (2) sort within each RDD partition. One tradeoff we can make here
is to keep stage 1 and not to do stage 2. Instead, we keep multiple parquet
file writers while writing the records. In this case, the number of writers
should be bounded as range sorting has been done in stage 1. This may work for
intermediate bulk_insert.
> Make sortBy() inside bulkInsertInternal() configurable for bulk_insert
> ----------------------------------------------------------------------
>
> Key: HUDI-472
> URL: https://issues.apache.org/jira/browse/HUDI-472
> Project: Apache Hudi (incubating)
> Issue Type: Improvement
> Components: Performance
> Reporter: Ethan Guo
> Assignee: He ZongPing
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)