nsivabalan opened a new issue, #18844:
URL: https://github.com/apache/hudi/issues/18844

   ### Tips before filing an issue
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes
   - Join the mailing list to engage in conversations and get faster support at 
[email protected]. Aware
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. Filing 
here first for visibility / discussion.
   
   ### Describe the problem you faced
   
   In `HoodieAppendHandle`, the block-flush gate inside `flushToDiskIfRequired` 
sizes the **incoming** `HoodieRecord` to compute `averageRecordSize`, but the 
record that is actually retained in `recordList` (the in-memory buffer that the 
gate is supposed to bound) is the post-`prepareRecord` clone produced by 
`bufferInsertAndUpdate` — i.e. `populatedRecord.copy()` after 
`prependMetaFields`. On the Spark engine these two objects are very different 
in heap size:
   
   - Incoming: `HoodieSparkRecord` wrapping a compact `UnsafeRow` (hundreds of 
bytes).
   - Buffered (heap-retained): `HoodieAvroIndexedRecord` wrapping a 
fully-deserialized Avro `IndexedRecord` (many KB per record).
   
   Because the EWMA is fed the wrong (small) size, the flush trigger
   
   ```java
   numberOfRecords >= maxBlockSize / averageRecordSize
   ```
   
   fires far later than it should. The result is that `recordList` accumulates 
many times more heap than `maxBlockSize` would predict, and the subsequent 
`HFileDataBlock` / Avro block serialization OOMs on metadata-table (RLI 
partition) writes. We have seen this in production: the on-disk log block is 
~150 MB GZ-compressed but ~2 GB on heap is retained by the time serialization 
starts.
   
   The same wrong-object sizing also exists at the initial seed in `init()` 
(`averageRecordSize = sizeEstimator.sizeEstimate(record);`), which runs against 
the incoming record before any `prepareRecord` conversion.
   
   For the Avro engine the incoming and post-prepare shapes are similar, so the 
symptom is Spark-specific.
   
   ### To Reproduce
   
   Steps to reproduce the behavior:
   
   1. Write to a Hudi MOR table with the Spark engine.
   2. Use a workload that produces metadata-table updates large enough to fill 
the configured `hoodie.logfile.data.block.max.size` (default 256 MB). 
Metadata-table RLI partition writes are a reliable trigger.
   3. Observe heap usage during the append: `recordList` retains many times the 
configured block-max-size before `flushToDiskIfRequired` fires, and the 
subsequent HFile/Avro block serialization peaks 5–10x the configured 
block-max-size.
   4. With sufficient incoming record cardinality on a small executor heap, the 
writer OOMs.
   
   ### Expected behavior
   
   `averageRecordSize` should reflect the size of the records actually being 
retained in `recordList` (i.e., the post-`prepareRecord` objects). The buffer 
should never exceed roughly `maxBlockSize` worth of heap before the flush gate 
fires.
   
   ### Environment Description
   
   - Hudi version: master (1.x). The same bug exists in 0.14.x and the 
structure is identical (single `writeToBuffer` rather than the 1.x 
`bufferRecord` / `bufferInsertAndUpdate` split).
   - Spark version: 3.5 (also reproduces on 3.4)
   - Storage (HDFS/S3/GCS..): S3
   - Running on Docker?: No
   
   ### Additional context
   
   The earlier perf work in [HUDI-XXXX / 
apache/hudi#14078](https://github.com/apache/hudi/pull/14078) (\"Reduce memory 
usage of writing HFile log block\") reduced peak heap **during** HFileDataBlock 
serialization — drain-on-serialize + sort-once streaming. That helped, but the 
upstream problem is that `recordList` was allowed to grow many times past one 
block's worth in the first place, because the gate's estimate was sized against 
the wrong object. This fix addresses that upstream cause; the two changes are 
complementary.
   
   Fix shape (PR #18843):
   
   - `writeToBuffer` / `bufferRecord` / `bufferInsertAndUpdate` return the 
buffered (post-`prependMetaFields`) record (or `null` for delete/ignored/error 
paths).
   - The 3 call sites (`doAppend`, `doWrite`, `write(Map)`) flip order: buffer 
first, then check the flush gate with the buffered record.
   - `flushToDiskIfRequired` sizes the buffered record. Lazy initial seed when 
`averageRecordSize == 0` (replacing the wrong eager seed in `init()`). 
Div-by-zero guard for delete-only prefixes.
   - 6 unit tests in a new `TestHoodieAppendHandle`.
   
   ### Stacktrace
   
   ```
   N/A — symptom is an OOM under heap pressure rather than a typed exception. 
The retained-heap distribution under `recordList` is visible in heap dumps; in 
the original incident, `recordList` held ~2 GB on heap for a log block that 
compressed to ~150 MB on disk.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to