[ 
https://issues.apache.org/jira/browse/HUDI-2949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kudinkin updated HUDI-2949:
----------------------------------
    Description: 
These observations are from running Layout Optimization (Clustering) on a 
[small Amazon Reviews|https://s3.amazonaws.com/amazon-reviews-pds/readme.html] 
(4.5Gb, reduced) dataset
h2. *Major*

  * GC is taking up to *25%* of CPU cycles (a lot of churn)
 * 
 ** A lot of ArrayList resizing like in the code below

 
{code:java}
// Creating empty list, then immediately inserting List<Object> values = new 
ArrayList<>(); 
values.addAll(JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
 values.add(hilbertValue);{code}
 
 * 
 ** A lot of avoidable boxing like following
 *  

{code:java}
 Collecting as Longs, then unboxing into longs List<Long> longList = 
fieldMap.entrySet().stream().map(...) byte[] hilbertValue = 
HilbertCurveUtils.indexBytes( hilbertCurve, longList.stream().mapToLong(l -> 
l).toArray(), 63);     {code}
 * Up to *20%* of wall-clock time is spent under locks in 
BoundedInMemoryExecutor (LinkedBlockingQueue)
 * ~35% of wall-clock time is spent Gzip-ing the output

h2. *Per stage*

 
Experiment could roughly be broken down into following stages * 
{_}Bulk-insert{_}: of the raw data into Hudi table
 * {_}Sorting{_}: re-sorting data according to the Layout Optimization config 
(reshuffling)
 * {_}Bulk-insert (of the sorted){_}: bulk inserting reshuffled data

 
h4. Bulk Insert

{_}Memory Allocated Total{_}: 22,000 samples x 500kb (sampling frequency) ~= 
11Gb
{_}GC{_}: 6%
 
_Observations_ * *~30%* of CPU is spent on Gzip compression
 * 
 ** Created HUDI-2928 to flip default from gzip to zstd

h4. Sorting

_Memory Allocated Total:_ 36,000 samples x 500kb (sampling frequency) ~= *18Gb*
{_}GC{_}: ~6%
 
_Observations (Memory)_ * About *16%* allocated by 
{{BinaryStatistics.updateStats}} in Parquet's {{ColumnWriterBase}}
 * 
 ** +Writing to Parquet column as a whole allocates+ {*}+~19%+{*}{+}, ie the 
actual write allocates only 3% and{+} {*}+80% of it is overhead+{*}{+}.{+}
 ** Allocating {{HeapByteBuffer}} in {{Binary.toByteBuffer}} w/in 
{{PrimitiveComparator}} (!!!) accounting min/max values for columns
 ** Created PARQUET-2106 / 
[PR#940|https://github.com/apache/parquet-mr/pull/940]
 * About *18%* is spent on {{bytesToAvro}} / {{avroToBytes}} conversion in 
calls to
 ** {{OverwriteWithLatestAvroPayload.getInsertValue}}
 ** {{OverwriteWithLatestAvroPayload.<init>}}
 * About 4% is allocated in by fetching {{Path.getName}} 
{{HoodieWrapperFileSystem.getBytesWritten}}
 ** Internally Hadoop calls {{path.substring}} allocating new string every time
 * About *5%* of Memory is allocated by {{DefaultSizeEstimator.sizeEstimate}}
 ** ~3% is in ctor – instance allocates by default:

 * private final Deque<Object> pending = new ArrayDeque<>(16 * 1024);
 * Remaining 2% are allocated while traversing the object tree
 ** Resizing hash-sets
 ** Fetching methods/fields through reflection (allocates arrays)

 
_Observations (CPU)_ * About 30% of time is spent in waiting state under locks 
w/in {{LinkedBlockingQueue}} in {{BoundedInMemoryQueue}}
 * About 10% is spent on parsing Spark's {{Row}} in 
{{HoodieSparkUtils.createRdd}}
 * About 2% of the CPU wall time spent on Parsing Avro schemas

 
h4. Bulk-insert (sorted)

Memory Allocated (Total): 45,000 samples x 500kb ~= *22Gb*
GC: *~23%*
 
Observations are similar to [unordered 
bulk-insert|https://app.clickup.com/18029943/v/dc/h67bq-1900/h67bq-5880?block=block-3cfa6bf5-23bd-4e21-8a56-48fcb198b244]
 
h2. Profiling

All profiles for these benchmarks have been taken using 
[async-profiler|https://github.com/jvm-profiling-tools/async-profiler].

 
{code:java}
# CPU 
CPU PID=48449;EVENT=itimer;TS=$(date +%s); ./profiler.sh collect -e $EVENT -d 
60 -f "profile_${PID}${EVENT}${TS}.html" $PID
# Memory
PID=<pid>;EVENT=alloc;TS=$(date +%s); ./profiler.sh collect -e $EVENT -d 60 -f 
"profile_${PID}${EVENT}${TS}.html" --alloc 500k $PI
{code}
 

  was:
These observations are from running Layout Optimization (Clustering) on a 
[small Amazon Reviews|https://s3.amazonaws.com/amazon-reviews-pds/readme.html] 
(4.5Gb, reduced) dataset
h2. *Major*
  * GC is taking up to *25%* of CPU cycles (a lot of churn)
 ** A lot of ArrayList resizing like in the code below

 
{code:java}
// Creating empty list, then immediately inserting List<Object> values = new 
ArrayList<>(); 
values.addAll(JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
 values.add(hilbertValue);{code}
 
 * 
 ** A lot of avoidable boxing like following
 *  

{code:java}
 Collecting as Longs, then unboxing into longs List<Long> longList = 
fieldMap.entrySet().stream().map(...) byte[] hilbertValue = 
HilbertCurveUtils.indexBytes( hilbertCurve, longList.stream().mapToLong(l -> 
l).toArray(), 63);     {code}
 * Up to *20%* of wall-clock time is spent under locks in 
BoundedInMemoryExecutor (LinkedBlockingQueue)
 * ~35% of wall-clock time is spent Gzip-ing the output

h2. *Per stage*
 
Experiment could roughly be broken down into following stages * 
{_}Bulk-insert{_}: of the raw data into Hudi table
 * {_}Sorting{_}: re-sorting data according to the Layout Optimization config 
(reshuffling)
 * {_}Bulk-insert (of the sorted){_}: bulk inserting reshuffled data

 
h4. Bulk Insert
{_}Memory Allocated Total{_}: 22,000 samples x 500kb (sampling frequency) ~= 
11Gb
{_}GC{_}: 6%
 
_Observations_ * *~30%* of CPU is spent on Gzip compression
 ** Created HUDI-2928 to flip default from gzip to zstd

h4. Sorting
_Memory Allocated Total:_ 36,000 samples x 500kb (sampling frequency) ~= *18Gb*
{_}GC{_}: __ 6%
 
_Observations (Memory)_ * About *16%* allocated by 
{{BinaryStatistics.updateStats}} in Parquet's {{ColumnWriterBase}}
 ** +Writing to Parquet column as a whole allocates+ {*}+~19%+{*}{+}, ie the 
actual write allocates only 3% and{+} {*}+80% of it is overhead+{*}{+}.{+}
 ** Allocating {{HeapByteBuffer}} in {{Binary.toByteBuffer}} w/in 
{{PrimitiveComparator}} (!!!) accounting min/max values for columns
 ** Created PARQUET-2106 / 
[PR#940|https://github.com/apache/parquet-mr/pull/940]
 * About *18%* is spent on {{bytesToAvro}} / {{avroToBytes}} conversion in 
calls to
 ** {{OverwriteWithLatestAvroPayload.getInsertValue}}
 ** {{OverwriteWithLatestAvroPayload.<init>}}
 * About 4% is allocated in by fetching {{Path.getName}} 
{{HoodieWrapperFileSystem.getBytesWritten}}
 ** Internally Hadoop calls {{path.substring}} allocating new string every time
 * About *5%* of Memory is allocated by {{DefaultSizeEstimator.sizeEstimate}}
 ** ~3% is in ctor -- instance allocates by default:

 * private final Deque<Object> pending = new ArrayDeque<>(16 * 1024);
 * Remaining 2% are allocated while traversing the object tree
 ** Resizing hash-sets
 ** Fetching methods/fields through reflection (allocates arrays)

 
_Observations (CPU)_ * About 30% of time is spent in waiting state under locks 
w/in {{LinkedBlockingQueue}} in {{BoundedInMemoryQueue}}
 * About 10% is spent on parsing Spark's {{Row}} in 
{{HoodieSparkUtils.createRdd}}
 * About 2% of the CPU wall time spent on Parsing Avro schemas

 
 
h4. Bulk-insert (sorted)
Memory Allocated (Total): 45,000 samples x 500kb ~= *22Gb*
GC: *~23%*
 
Observations are similar to [unordered 
bulk-insert|https://app.clickup.com/18029943/v/dc/h67bq-1900/h67bq-5880?block=block-3cfa6bf5-23bd-4e21-8a56-48fcb198b244]
 
_CPU (sampled)_
profile_48449_itimer...ert_sorted
.html
 
 
 
 
.html
 
 
_Memory (sampled)_
profile_48449_alloc_...ert_sorted
.html
 
 
 
 
.html
 
 
 
 
h2. Results
 
After implementing following optimizations * Using patched parquet-column w/ 
PARQUET-2106 addressed
 * Avoiding unnecessary Avro ser/de loop (by using {{{}RewriteAvroPayload{}}})
 * Caching {{Path.getName}}
 * Avoiding unnecessary primitive types boxing

 
| |Bulk-insert
_before/after_|Sorting
_before/after_|Bulk-insert (sorted)
_before/after_|
|Memory allocated|~11Gb / *~6.5Gb*|~18Gb / *~13Gb*|~22Gb / *~17Gb*|
 
*Total CPU Time (for all Spark tasks) / GC Total*
Before (gzip): 78min / 16min
After (gzip): 66min / 11min
After (gzip, JDK11): 60min / *2.9min*
After (zstd): 52 min / 10min
 
 
h2. Appendix: Profiling
All profiles for these benchmarks have been taken using 
[async-profiler|https://github.com/jvm-profiling-tools/async-profiler].
 
# CPU PID=48449;EVENT=itimer;TS=$(date +%s); ./profiler.sh collect -e $EVENT -d 
60 -f "profile_${PID}_${EVENT}_${TS}.html" $PID # Memory 
PID=<pid>;EVENT=alloc;TS=$(date +%s); ./profiler.sh collect -e $EVENT -d 60 -f 
"profile_${PID}_${EVENT}_${TS}.html" --alloc 500k $PI


> Benchmark Clustering performance
> --------------------------------
>
>                 Key: HUDI-2949
>                 URL: https://issues.apache.org/jira/browse/HUDI-2949
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: Alexey Kudinkin
>            Assignee: Alexey Kudinkin
>            Priority: Major
>
> These observations are from running Layout Optimization (Clustering) on a 
> [small Amazon 
> Reviews|https://s3.amazonaws.com/amazon-reviews-pds/readme.html] (4.5Gb, 
> reduced) dataset
> h2. *Major*
>   * GC is taking up to *25%* of CPU cycles (a lot of churn)
>  * 
>  ** A lot of ArrayList resizing like in the code below
>  
> {code:java}
> // Creating empty list, then immediately inserting List<Object> values = new 
> ArrayList<>(); 
> values.addAll(JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
>  values.add(hilbertValue);{code}
>  
>  * 
>  ** A lot of avoidable boxing like following
>  *  
> {code:java}
>  Collecting as Longs, then unboxing into longs List<Long> longList = 
> fieldMap.entrySet().stream().map(...) byte[] hilbertValue = 
> HilbertCurveUtils.indexBytes( hilbertCurve, longList.stream().mapToLong(l -> 
> l).toArray(), 63);     {code}
>  * Up to *20%* of wall-clock time is spent under locks in 
> BoundedInMemoryExecutor (LinkedBlockingQueue)
>  * ~35% of wall-clock time is spent Gzip-ing the output
> h2. *Per stage*
>  
> Experiment could roughly be broken down into following stages * 
> {_}Bulk-insert{_}: of the raw data into Hudi table
>  * {_}Sorting{_}: re-sorting data according to the Layout Optimization config 
> (reshuffling)
>  * {_}Bulk-insert (of the sorted){_}: bulk inserting reshuffled data
>  
> h4. Bulk Insert
> {_}Memory Allocated Total{_}: 22,000 samples x 500kb (sampling frequency) ~= 
> 11Gb
> {_}GC{_}: 6%
>  
> _Observations_ * *~30%* of CPU is spent on Gzip compression
>  * 
>  ** Created HUDI-2928 to flip default from gzip to zstd
> h4. Sorting
> _Memory Allocated Total:_ 36,000 samples x 500kb (sampling frequency) ~= 
> *18Gb*
> {_}GC{_}: ~6%
>  
> _Observations (Memory)_ * About *16%* allocated by 
> {{BinaryStatistics.updateStats}} in Parquet's {{ColumnWriterBase}}
>  * 
>  ** +Writing to Parquet column as a whole allocates+ {*}+~19%+{*}{+}, ie the 
> actual write allocates only 3% and{+} {*}+80% of it is overhead+{*}{+}.{+}
>  ** Allocating {{HeapByteBuffer}} in {{Binary.toByteBuffer}} w/in 
> {{PrimitiveComparator}} (!!!) accounting min/max values for columns
>  ** Created PARQUET-2106 / 
> [PR#940|https://github.com/apache/parquet-mr/pull/940]
>  * About *18%* is spent on {{bytesToAvro}} / {{avroToBytes}} conversion in 
> calls to
>  ** {{OverwriteWithLatestAvroPayload.getInsertValue}}
>  ** {{OverwriteWithLatestAvroPayload.<init>}}
>  * About 4% is allocated in by fetching {{Path.getName}} 
> {{HoodieWrapperFileSystem.getBytesWritten}}
>  ** Internally Hadoop calls {{path.substring}} allocating new string every 
> time
>  * About *5%* of Memory is allocated by {{DefaultSizeEstimator.sizeEstimate}}
>  ** ~3% is in ctor – instance allocates by default:
>  * private final Deque<Object> pending = new ArrayDeque<>(16 * 1024);
>  * Remaining 2% are allocated while traversing the object tree
>  ** Resizing hash-sets
>  ** Fetching methods/fields through reflection (allocates arrays)
>  
> _Observations (CPU)_ * About 30% of time is spent in waiting state under 
> locks w/in {{LinkedBlockingQueue}} in {{BoundedInMemoryQueue}}
>  * About 10% is spent on parsing Spark's {{Row}} in 
> {{HoodieSparkUtils.createRdd}}
>  * About 2% of the CPU wall time spent on Parsing Avro schemas
>  
> h4. Bulk-insert (sorted)
> Memory Allocated (Total): 45,000 samples x 500kb ~= *22Gb*
> GC: *~23%*
>  
> Observations are similar to [unordered 
> bulk-insert|https://app.clickup.com/18029943/v/dc/h67bq-1900/h67bq-5880?block=block-3cfa6bf5-23bd-4e21-8a56-48fcb198b244]
>  
> h2. Profiling
> All profiles for these benchmarks have been taken using 
> [async-profiler|https://github.com/jvm-profiling-tools/async-profiler].
>  
> {code:java}
> # CPU 
> CPU PID=48449;EVENT=itimer;TS=$(date +%s); ./profiler.sh collect -e $EVENT -d 
> 60 -f "profile_${PID}${EVENT}${TS}.html" $PID
> # Memory
> PID=<pid>;EVENT=alloc;TS=$(date +%s); ./profiler.sh collect -e $EVENT -d 60 
> -f "profile_${PID}${EVENT}${TS}.html" --alloc 500k $PI
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to