Alowator commented on code in PR #12960: URL: https://github.com/apache/hudi/pull/12960#discussion_r1991287141
########## rfc/rfc-87/rfc-87.md: ########## @@ -24,9 +24,329 @@ ## Approvers - @danny0405 -- @xiarixiaoyao -- @yuzhaojing +- @cshuo ## Status: Claim -JIRA: [HUDI-8934](https://issues.apache.org/jira/browse/HUDI-8934) +Umbrella ticket: [HUDI-9075](https://issues.apache.org/jira/browse/HUDI-9075) + +## Abstract + +Building on RFC-84, which removed Avro from Flink’s pre-write operators, RFC-87 eliminates Avro from the write path to improve performance. +Current writes suffer from excessive Avro serialization/deserialization and in-memory storage of List<HoodieRecord>, causing high GC overhead. +This RFC replaces DataBucket’s list storage with Flink’s BinaryInMemorySortBuffer, enabling efficient sorting and iterator-based writes. +HoodieLogBlock is also refactored to separate deserialization from buffering. +Precombine field deduplication will now occur after sorting. + +By removing Avro, this RFC reduces GC pressure, minimizes serialization overhead, as a result it improves Flink write performance in Hudi. + +## Background + +Currently, Flink writes in Hudi rely heavily on Avro for serialization, deserialization, and in-memory storage, particularly in DataBucket and AppendHandle. +This approach introduces unnecessary GC pressure, frequent avroToBytes/bytesToAvro conversions, and overall performance bottlenecks. + +With RFC-84, Avro was eliminated from Flink’s pre-write operators. However, the write path still suffers from excessive Avro-based processing, leading to inefficiencies. +StreamWriteFunction now receives HoodieFlinkInternalRow, which already encapsulates Flink’s native RowData, making it possible to remove Avro entirely from the write phase. + +## Implementation + +The keys ideas of Flink native implementation is considered in rework of DataBucket and Flink write handles. +This implementation doesn't touch neither bulk_insert nor append mode and works for all index types. +It changes only StreamWriteFunction logic. + +### DataBucket + +To reduce GC pressure and eliminate Avro usage DataBucket could store RowData records in BinaryInMemorySortBuffer. +Instead of getRecords() DataBucket could provide getIterator() method, which will be passed to Flink write handles. +MutableObjectIterator is Flink's @Internal class, so it's better to create Hudi internal Iterator which will incapsulate MutableObjectIterator's logic. + +```Java +protected static class DataBucket { + private final BinaryInMemorySortBuffer sortBuffer; + private final BufferSizeDetector detector; + + private DataBucket(Double batchSize, RowType rowType) { + this.sortBuffer = SortBufferFactory.createInMemorySortBuffer(rowType); + this.detector = new BufferSizeDetector(batchSize); + } + + public MutableObjectIterator<BinaryRowData> getIterator() { + return this.sortBuffer.getIterator(); + } + + public boolean isEmpty() { + return sortBuffer.isEmpty(); + } + + public void reset() { + this.sortBuffer.reset(); + this.detector.reset(); + } + Review Comment: If we will perform sorting, we need to add at least hoodie record key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
