Alowator commented on code in PR #12960:
URL: https://github.com/apache/hudi/pull/12960#discussion_r1992796291


##########
rfc/rfc-87/rfc-87.md:
##########
@@ -24,9 +24,329 @@
 ## Approvers
 
 - @danny0405
-- @xiarixiaoyao
-- @yuzhaojing
+- @cshuo
 
 ## Status: Claim
 
-JIRA: [HUDI-8934](https://issues.apache.org/jira/browse/HUDI-8934)
+Umbrella ticket: [HUDI-9075](https://issues.apache.org/jira/browse/HUDI-9075)
+
+## Abstract
+
+Building on RFC-84, which removed Avro from Flink’s pre-write operators, 
RFC-87 eliminates Avro from the write path to improve performance.
+Current writes suffer from excessive Avro serialization/deserialization and 
in-memory storage of List<HoodieRecord>, causing high GC overhead.
+This RFC replaces DataBucket’s list storage with Flink’s 
BinaryInMemorySortBuffer, enabling efficient sorting and iterator-based writes.
+HoodieLogBlock is also refactored to separate deserialization from buffering.
+Precombine field deduplication will now occur after sorting.
+
+By removing Avro, this RFC reduces GC pressure, minimizes serialization 
overhead, as a result it improves Flink write performance in Hudi.
+
+## Background
+
+Currently, Flink writes in Hudi rely heavily on Avro for serialization, 
deserialization, and in-memory storage, particularly in DataBucket and 
AppendHandle.
+This approach introduces unnecessary GC pressure, frequent 
avroToBytes/bytesToAvro conversions, and overall performance bottlenecks.
+
+With RFC-84, Avro was eliminated from Flink’s pre-write operators. However, 
the write path still suffers from excessive Avro-based processing, leading to 
inefficiencies.
+StreamWriteFunction now receives HoodieFlinkInternalRow, which already 
encapsulates Flink’s native RowData, making it possible to remove Avro entirely 
from the write phase.
+
+## Implementation
+
+The keys ideas of Flink native implementation is considered in rework of 
DataBucket and Flink write handles.
+This implementation doesn't touch neither bulk_insert nor append mode and 
works for all index types.
+It changes only StreamWriteFunction logic.
+
+### DataBucket
+
+To reduce GC pressure and eliminate Avro usage DataBucket could store RowData 
records in BinaryInMemorySortBuffer.
+Instead of getRecords() DataBucket could provide getIterator() method, which 
will be passed to Flink write handles.
+MutableObjectIterator is Flink's @Internal class, so it's better to create 
Hudi internal Iterator which will incapsulate MutableObjectIterator's logic.  
+
+```Java
+protected static class DataBucket {
+  private final BinaryInMemorySortBuffer sortBuffer;
+  private final BufferSizeDetector detector;
+
+  private DataBucket(Double batchSize, RowType rowType) {
+    this.sortBuffer = SortBufferFactory.createInMemorySortBuffer(rowType);
+    this.detector = new BufferSizeDetector(batchSize);
+  }
+  
+  public MutableObjectIterator<BinaryRowData> getIterator() {
+    return this.sortBuffer.getIterator();
+  }
+  
+  public boolean isEmpty() {
+    return sortBuffer.isEmpty();
+  }
+  
+  public void reset() {
+    this.sortBuffer.reset();
+    this.detector.reset();
+  }
+
+  public void add(HoodieFlinkInternalRow record) throws IOException {
+    this.sortBuffer.write(record.getRowDataWithMetadata());
+  }
+}
+```
+
+BinaryInMemorySortBuffer could ve provided by SortBufferFactory implementation.
+
+```Java
+class SortBufferFactory {
+
+  private static final int DEFAULT_PAGE_SIZE = 65536;
+
+  static BinaryInMemorySortBuffer createInMemorySortBuffer(RowType rowType) {
+    NormalizedKeyComputer normalizedKeyComputer = new 
NormalizedRecordKeyComputer();
+    RecordComparator keyComparator = new NormalizedRecordKeyComputer();
+
+    RowDataSerializer serializer = new RowDataSerializer(rowType);
+    BinaryRowDataSerializer binarySerializer = new 
BinaryRowDataSerializer(rowType.getFieldCount());
+
+    MemorySegmentPool unlimitedMemoryPool = new 
UnlimitedHeapMemorySegmentPool(DEFAULT_PAGE_SIZE);
+
+    return BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, 
serializer, binarySerializer,
+        keyComparator, unlimitedMemoryPool);
+  }
+}
+
+```
+
+## NormalizedRecordKeyComputer
+
+There is no requirements to generate NormalizedRecordKeyComputer, because 
_hoodie_record_key could be used always.
+
+```Java
+/**
+ * Computes normalized keys for records, used in sorting operations.
+ * It uses {@link FastRecordKeyComparator} for comparing and transforming 
record keys
+ * into normalized byte representations.
+ */
+public class NormalizedRecordKeyComputer implements NormalizedKeyComputer, 
RecordComparator {
+
+  private final FastRecordKeyComparator comparator = new 
FastRecordKeyComparator();
+
+  /** Number of bytes allocated for the normalized key. */
+  private static final int NUM_KEY_BYTES = 8;
+
+  private static final int RECORD_KEY_POS = 
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal();
+
+  @Override
+  public void putKey(RowData record, MemorySegment target, int offset) {
+    if (record.isNullAt(RECORD_KEY_POS)) {
+      SortUtil.minNormalizedKey(target, offset, NUM_KEY_BYTES);
+    } else {
+      byte[] hoodieKeyBytes = 
comparator.getRecordKeyBytes(record.getString(RECORD_KEY_POS).toString());
+      int limit = offset + NUM_KEY_BYTES;
+
+      int i;
+      for (i = 0; i < hoodieKeyBytes.length && offset < limit; ++i) {
+        target.put(offset++, hoodieKeyBytes[i]);
+      }
+      for (i = offset; i < limit; ++i) {
+        target.put(i, (byte) 0);
+      }
+    }
+  }
+
+  @Override
+  public int compareKey(MemorySegment segI, int offsetI, MemorySegment segJ, 
int offsetJ) {
+    byte[] a = new byte[NUM_KEY_BYTES];
+    byte[] b = new byte[NUM_KEY_BYTES];
+    segI.get(offsetI, a, 0, NUM_KEY_BYTES);
+    segJ.get(offsetJ, b, 0, NUM_KEY_BYTES);
+    return comparator.compareBytesWithoutColumnName(a, b);
+  }
+
+  /**
+   * This method implements RecordComparator logic due to use common {@link 
FastRecordKeyComparator}
+   * for both serialized and deserialized representations of record key.
+   */
+  @Override
+  public int compare(RowData a, RowData b) {
+    return comparator.compare(
+        a.getString(RECORD_KEY_POS).toString(),
+        b.getString(RECORD_KEY_POS).toString());
+  }
+
+  @Override
+  public void swapKey(MemorySegment segI, int offsetI, MemorySegment segJ, int 
offsetJ) {
+    byte[] temp = new byte[NUM_KEY_BYTES];
+    segI.swapBytes(temp, segJ, offsetI, offsetJ, NUM_KEY_BYTES);
+  }
+
+  @Override
+  public int getNumKeyBytes() {
+    return NUM_KEY_BYTES;
+  }
+
+  @Override
+  public boolean isKeyFullyDetermines() {
+    return false;
+  }
+
+  @Override
+  public boolean invertKey() {
+    return false;
+  }
+}
+```
+
+Comparing record using _hoodie_record_key has a problem, for 
ComplexKeyGenerator _hoodie_record_key starts with column name.
+It makes BinaryInMemorySortBuffer usage inefficient, because 
BinaryInMemorySortBuffer builds index with first 8 bytes from key to make 
sorting faster (avoid records serialization/deserialization). 
+Here is possible to use something like FastRecordKeyComparator to trim column 
name from key values.
+
+```Java
+/**
+ * Comparator for record keys used in sorting operations before writing and 
during data compaction.
+ * Utilizes {@link SignedBytes#lexicographicalComparator()} for 
high-performance lexicographical comparison.
+ *
+ * <p>Writer uses this class for sorting data before writing, while the 
Compaction service uses it
+ * for determining the order of records.
+ */
+public class FastRecordKeyComparator implements Comparator<String> {
+  Comparator<byte[]> comparator = SignedBytes.lexicographicalComparator();
+
+  /**
+   * Compares two string representing full record keys.
+   *
+   * @return Negative if a < b, positive if a > b, 0 if equal.
+   */
+  @Override
+  public int compare(String recordKeyA, String recordKeyB) {
+    return compareBytesWithoutColumnName(
+        getRecordKeyBytes(recordKeyA),
+        getRecordKeyBytes(recordKeyB));
+  }
+
+  /**
+   * Compares two byte arrays representing record keys without column names 
and delimiters.
+   * 
+   * <p>These byte arrays should be a result of {@link #getRecordKeyBytes}
+   *
+   * @return Negative if a < b, positive if a > b, 0 if equal.
+   */
+  public int compareBytesWithoutColumnName(byte[] a, byte[] b) {
+    return comparator.compare(a, b);
+  }
+
+  /**
+   * Converts a record key into a byte array by extracting only values 
(without column name)
+   * and encoding them in UTF-8.
+   *
+   * @return Byte array representing record key without column names and 
delimiters, these byte arrays
+   * should only be compared using {@link 
#compareBytesWithoutColumnName(byte[], byte[])} method.  
+   */
+  public byte[] getRecordKeyBytes(String recordKey) {
+    return String.join("", KeyGenUtils.extractRecordKeys(recordKey))
+        .getBytes(StandardCharsets.UTF_8);
+  }
+}
+```
+
+### FlinkAppendHandle / FlinkCreateHandle
+
+The key idea here is to use Iterator provided by DataBucket instead of 
List<HoodieRecrod>.
+
+For FlinkAppendHandle, flushToDiskIfRequired looks redundant here, because 
storage and size calculation could be provided by DataBucket.

Review Comment:
   I suggest to manage the buffer size in the DataBucket, it's so simple just 
calling getOccupancy() from BinaryInMemorySortBuffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to