cshuo commented on code in PR #12960:
URL: https://github.com/apache/hudi/pull/12960#discussion_r1992852521


##########
rfc/rfc-87/rfc-87.md:
##########
@@ -24,9 +24,329 @@
 ## Approvers
 
 - @danny0405
-- @xiarixiaoyao
-- @yuzhaojing
+- @cshuo
 
 ## Status: Claim
 
-JIRA: [HUDI-8934](https://issues.apache.org/jira/browse/HUDI-8934)
+Umbrella ticket: [HUDI-9075](https://issues.apache.org/jira/browse/HUDI-9075)
+
+## Abstract
+
+Building on RFC-84, which removed Avro from Flink’s pre-write operators, 
RFC-87 eliminates Avro from the write path to improve performance.
+Current writes suffer from excessive Avro serialization/deserialization and 
in-memory storage of List<HoodieRecord>, causing high GC overhead.
+This RFC replaces DataBucket’s list storage with Flink’s 
BinaryInMemorySortBuffer, enabling efficient sorting and iterator-based writes.
+HoodieLogBlock is also refactored to separate deserialization from buffering.
+Precombine field deduplication will now occur after sorting.
+
+By removing Avro, this RFC reduces GC pressure, minimizes serialization 
overhead, as a result it improves Flink write performance in Hudi.
+
+## Background
+
+Currently, Flink writes in Hudi rely heavily on Avro for serialization, 
deserialization, and in-memory storage, particularly in DataBucket and 
AppendHandle.
+This approach introduces unnecessary GC pressure, frequent 
avroToBytes/bytesToAvro conversions, and overall performance bottlenecks.
+
+With RFC-84, Avro was eliminated from Flink’s pre-write operators. However, 
the write path still suffers from excessive Avro-based processing, leading to 
inefficiencies.
+StreamWriteFunction now receives HoodieFlinkInternalRow, which already 
encapsulates Flink’s native RowData, making it possible to remove Avro entirely 
from the write phase.
+
+## Implementation
+
+The keys ideas of Flink native implementation is considered in rework of 
DataBucket and Flink write handles.
+This implementation doesn't touch neither bulk_insert nor append mode and 
works for all index types.
+It changes only StreamWriteFunction logic.
+
+### DataBucket
+
+To reduce GC pressure and eliminate Avro usage DataBucket could store RowData 
records in BinaryInMemorySortBuffer.
+Instead of getRecords() DataBucket could provide getIterator() method, which 
will be passed to Flink write handles.
+MutableObjectIterator is Flink's @Internal class, so it's better to create 
Hudi internal Iterator which will incapsulate MutableObjectIterator's logic.  
+
+```Java
+protected static class DataBucket {
+  private final BinaryInMemorySortBuffer sortBuffer;
+  private final BufferSizeDetector detector;
+
+  private DataBucket(Double batchSize, RowType rowType) {
+    this.sortBuffer = SortBufferFactory.createInMemorySortBuffer(rowType);
+    this.detector = new BufferSizeDetector(batchSize);
+  }
+  
+  public MutableObjectIterator<BinaryRowData> getIterator() {
+    return this.sortBuffer.getIterator();
+  }
+  
+  public boolean isEmpty() {
+    return sortBuffer.isEmpty();
+  }
+  
+  public void reset() {
+    this.sortBuffer.reset();
+    this.detector.reset();
+  }
+
+  public void add(HoodieFlinkInternalRow record) throws IOException {
+    this.sortBuffer.write(record.getRowDataWithMetadata());
+  }
+}
+```
+
+BinaryInMemorySortBuffer could ve provided by SortBufferFactory implementation.
+
+```Java
+class SortBufferFactory {
+
+  private static final int DEFAULT_PAGE_SIZE = 65536;
+
+  static BinaryInMemorySortBuffer createInMemorySortBuffer(RowType rowType) {
+    NormalizedKeyComputer normalizedKeyComputer = new 
NormalizedRecordKeyComputer();
+    RecordComparator keyComparator = new NormalizedRecordKeyComputer();
+
+    RowDataSerializer serializer = new RowDataSerializer(rowType);
+    BinaryRowDataSerializer binarySerializer = new 
BinaryRowDataSerializer(rowType.getFieldCount());
+
+    MemorySegmentPool unlimitedMemoryPool = new 
UnlimitedHeapMemorySegmentPool(DEFAULT_PAGE_SIZE);
+
+    return BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, 
serializer, binarySerializer,
+        keyComparator, unlimitedMemoryPool);

Review Comment:
   To use managed memory, operators may first declare certain size of memory by 
`Transformation#declareManagedMemoryUseCaseAtOperatorScope`, otherwise, may 
lead to unexpected behavior because of memory contention, such as failure of 
state operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to