[
https://issues.apache.org/jira/browse/HUDI-8934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mark Bukhner updated HUDI-8934:
-------------------------------
Description:
Inspired by RFC-84 HUDI-8920: there is an opinion Avro is not the best choice
for Hudi. It requires an extra ser/de operations not only between Flink
operators (will be fixed by RFC-84).
I decided to benchmark a POC version with native Flink's RowData writer for
Hudi. It was simple enough, because Hudi already has native RowData to Parquet
writer used by append mode, I reused this writer to create log blocks and two
bottlenecks were found:
1. Hudi performs *a lot of Avro ser/de operations* in writer runtime.
2. Hudi stores Avro recrods as List<HoodieRecord>, it causes a *GC pressure* on
writer runtime, on my benchmarks garbage collection is about 30% of all hudi
writer runtime.
!profiler.png|width=751,height=389!
h3. Results:
As a result I reduced write time from ~4min to ~1min 20sec ({*}x3 write
performance boost{*}):
!results.png|width=744,height=172!
I have a POC version we are already testing in our cloud environment, key
improvements:
# Write native RowData to parquet log blocks (eliminate unnecessary ser/de)
# Recrods are stored in BinaryInMemorySortBuffer (reduce GC pressure)
# Records are sorted by new QuickSort().sort(sortBuffer) before writing log
block (maybe it's possible to preform data compaction without sorting? this
sort performs fast enough so it doesn't affect write performance.)
# RowDataStreamWriteFunction flushes bucket async (reduces previous operators
backpressure)
h3. My config:
PC: 32CPU 128GiB
Data: 60 million records of TPC-H lineitem table
Java: 17 openjdk
Flink: 1.20, Single JM + Sinlge TM, standalone, taskmanager.process.size: 8G
Write: Hadoop HDFS 3.3.1, 9 node cluster
Read: Kafka 2.8, 3 node cluster, 8 partitions
Hudi table:
'connector' = 'hudi',
'path' = '<hdfs_path>',
'table.type' = 'MERGE_ON_READ',
'metadata.enabled' = 'false',
'index.type'='BUCKET',
'hoodie.bucket.index.hash.field'='l_orderkey,l_linenumber',
'hoodie.bucket.index.num.buckets'='8',
'hoodie.parquet.compression.codec' = 'snappy',
*'hoodie.logfile.data.block.format' = 'parquet',*
*'hoodie.enable.fast.sort.write' = 'true',*
'write.operation' = 'upsert',
'write.batch.size'='256',
'write.tasks'='8',
'compaction.async.enabled' = 'false',
'clean.async.enabled' = 'false',
'hoodie.archive.automatic' = 'false',
'hoodie.clean.automatic' = 'false'
was:
Inspired by RFC-84 HUDI-8920: there is an opinion Avro is not the best choice
for Hudi. It requires an extra ser/de operations not only between Flink
operators (will be fixed by RFC-84).
I decided to benchmark a POC version with native Flink's RowData writer for
Hudi. It was simple enough, because Hudi already has native RowData to Parquet
writer used by append mode, I reused this writer to create log blocks and two
bottlenecks were found:
1. Hudi performs *a lot of Avro ser/de operations* in writer runtime.
2. Hudi stores Avro recrods as List<HoodieRecord>, it causes a *GC pressure* on
writer runtime, on my benchmarks garbage collection is about 30% of all hudi
writer runtime.
!profiler.png!
h3. Results:
As a result I reduced write time from ~4min to ~1min 20sec ({*}x3 write
performance boost{*}):
!results.png!
I have a POC version we are already testing in our cloud environment, key
improvements:
# Write native RowData to parquet log blocks (eliminate unnecessary ser/de)
# Recrods are stored in BinaryInMemorySortBuffer (reduce GC pressure)
# Records are sorted by new QuickSort().sort(sortBuffer) before writing log
block (maybe it's possible to preform data compaction without sorting? this
sort performs fast enough so it doesn't affect write performance.)
# RowDataStreamWriteFunction flushes bucket async (reduces previous operators
backpressure)
h3. My config:
PC: 32CPU 128GiB
Data: 60 million records of TPC-H lineitem table
Java: 17 openjdk
Flink: 1.20, Single JM + Sinlge TM, standalone, taskmanager.process.size: 8G
Write: Hadoop HDFS 3.3.1, 9 node cluster
Read: Kafka 2.8, 3 node cluster, 8 partitions
Hudi table:
'connector' = 'hudi',
'path' = '<hdfs_path>',
'table.type' = 'MERGE_ON_READ',
'metadata.enabled' = 'false',
'index.type'='BUCKET',
'hoodie.bucket.index.hash.field'='l_orderkey,l_linenumber',
'hoodie.bucket.index.num.buckets'='8',
'hoodie.parquet.compression.codec' = 'snappy',
*'hoodie.logfile.data.block.format' = 'parquet',*
*'hoodie.enable.fast.sort.write' = 'true',*
'write.operation' = 'upsert',
'write.batch.size'='256',
'write.tasks'='8',
'compaction.async.enabled' = 'false',
'clean.async.enabled' = 'false',
'hoodie.archive.automatic' = 'false',
'hoodie.clean.automatic' = 'false'
> [RFC-87] Avro elimination for Flink writer
> ------------------------------------------
>
> Key: HUDI-8934
> URL: https://issues.apache.org/jira/browse/HUDI-8934
> Project: Apache Hudi
> Issue Type: New Feature
> Components: flink, performance
> Reporter: Mark Bukhner
> Assignee: Mark Bukhner
> Priority: Major
> Attachments: profiler.png, results.png
>
>
> Inspired by RFC-84 HUDI-8920: there is an opinion Avro is not the best choice
> for Hudi. It requires an extra ser/de operations not only between Flink
> operators (will be fixed by RFC-84).
> I decided to benchmark a POC version with native Flink's RowData writer for
> Hudi. It was simple enough, because Hudi already has native RowData to
> Parquet writer used by append mode, I reused this writer to create log blocks
> and two bottlenecks were found:
> 1. Hudi performs *a lot of Avro ser/de operations* in writer runtime.
> 2. Hudi stores Avro recrods as List<HoodieRecord>, it causes a *GC pressure*
> on writer runtime, on my benchmarks garbage collection is about 30% of all
> hudi writer runtime.
> !profiler.png|width=751,height=389!
> h3. Results:
> As a result I reduced write time from ~4min to ~1min 20sec ({*}x3 write
> performance boost{*}):
> !results.png|width=744,height=172!
> I have a POC version we are already testing in our cloud environment, key
> improvements:
> # Write native RowData to parquet log blocks (eliminate unnecessary ser/de)
> # Recrods are stored in BinaryInMemorySortBuffer (reduce GC pressure)
> # Records are sorted by new QuickSort().sort(sortBuffer) before writing log
> block (maybe it's possible to preform data compaction without sorting? this
> sort performs fast enough so it doesn't affect write performance.)
> # RowDataStreamWriteFunction flushes bucket async (reduces previous
> operators backpressure)
> h3. My config:
> PC: 32CPU 128GiB
> Data: 60 million records of TPC-H lineitem table
> Java: 17 openjdk
> Flink: 1.20, Single JM + Sinlge TM, standalone, taskmanager.process.size: 8G
> Write: Hadoop HDFS 3.3.1, 9 node cluster
> Read: Kafka 2.8, 3 node cluster, 8 partitions
> Hudi table:
> 'connector' = 'hudi',
> 'path' = '<hdfs_path>',
> 'table.type' = 'MERGE_ON_READ',
> 'metadata.enabled' = 'false',
> 'index.type'='BUCKET',
> 'hoodie.bucket.index.hash.field'='l_orderkey,l_linenumber',
> 'hoodie.bucket.index.num.buckets'='8',
> 'hoodie.parquet.compression.codec' = 'snappy',
> *'hoodie.logfile.data.block.format' = 'parquet',*
> *'hoodie.enable.fast.sort.write' = 'true',*
> 'write.operation' = 'upsert',
> 'write.batch.size'='256',
> 'write.tasks'='8',
> 'compaction.async.enabled' = 'false',
> 'clean.async.enabled' = 'false',
> 'hoodie.archive.automatic' = 'false',
> 'hoodie.clean.automatic' = 'false'
--
This message was sent by Atlassian Jira
(v8.20.10#820010)