[ 
https://issues.apache.org/jira/browse/HUDI-8934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Bukhner updated HUDI-8934:
-------------------------------
    Status: In Progress  (was: Open)

> [RFC-87] Avro elimination for Flink writer
> ------------------------------------------
>
>                 Key: HUDI-8934
>                 URL: https://issues.apache.org/jira/browse/HUDI-8934
>             Project: Apache Hudi
>          Issue Type: New Feature
>          Components: flink, performance
>            Reporter: Mark Bukhner
>            Assignee: Mark Bukhner
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.1.0
>
>         Attachments: profiler.png, results.png
>
>
> Inspired by RFC-84 HUDI-8920: there is an opinion Avro is not the best choice 
> for Hudi. It requires an extra ser/de operations not only between Flink 
> operators (will be fixed by RFC-84).
> I decided to benchmark a POC version with native Flink's RowData writer for 
> Hudi. It was simple enough, because Hudi already has native RowData to 
> Parquet writer used by append mode, I reused this writer to create log blocks 
> and two bottlenecks were found:
> 1. Hudi performs *a lot of Avro ser/de operations* in writer runtime.
> 2. Hudi stores Avro recrods as List<HoodieRecord>, it causes a *GC pressure* 
> on writer runtime, on my benchmarks garbage collection is about 30% of all 
> hudi writer runtime.
> !profiler.png|width=876,height=454!
> h3. Results:
> As a result I reduced write time from ~4min to ~1min 20sec ({*}x3 write 
> performance boost{*}):
> !results.png|width=869,height=201!
> I have a POC version we are already testing in our cloud environment, key 
> improvements:
>  # This POC based on RFC-84 and already contains POC from RFC-84
>  # Write native RowData to parquet log blocks (eliminate unnecessary ser/de)
>  # Recrods are stored in BinaryInMemorySortBuffer (reduce GC pressure)
>  # Records are sorted by new QuickSort().sort(sortBuffer) before writing log 
> block (maybe it's possible to preform data compaction without sorting? this 
> sort performs fast enough so it doesn't affect write performance.)
>  # RowDataStreamWriteFunction flushes bucket async (reduces previous 
> operators backpressure)
> h3. My config:
> PC: 32CPU 128GiB
> Data: 60 million records of TPC-H lineitem table
> Java: 17 openjdk
> Flink: 1.20, Single JM + Sinlge TM, standalone, taskmanager.process.size: 8G
> Write: Hadoop HDFS 3.3.1, 9 node cluster
> Read: Kafka 2.8, 3 node cluster, 8 partitions
> Hudi table:
>     'connector' = 'hudi',
>     'path' = '<hdfs_path>',
>     'table.type' = 'MERGE_ON_READ',
>     'metadata.enabled' = 'false',
>     'index.type'='BUCKET',
>     'hoodie.bucket.index.hash.field'='l_orderkey,l_linenumber',
>     'hoodie.bucket.index.num.buckets'='8',
>     'hoodie.parquet.compression.codec' = 'snappy',
>     *'hoodie.logfile.data.block.format' = 'parquet',*
>     *'hoodie.enable.fast.sort.write' = 'true',*
>     'write.operation' = 'upsert',
>     'write.batch.size'='256',
>     'write.tasks'='8',
>     'compaction.async.enabled' = 'false',
>     'clean.async.enabled' = 'false',
>     'hoodie.archive.automatic' = 'false',
>     'hoodie.clean.automatic' = 'false'



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to