nsivabalan opened a new issue, #18859:
URL: https://github.com/apache/hudi/issues/18859

   ## Tips before filing an issue
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes.
   - Join the mailing list to engage in conversations and get faster support at 
[email protected]. Yes.
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. This is 
filed as an improvement.
   
   ## Describe the problem you faced
   
   `HoodieAppendHandle` accumulates records in an in-memory `recordList` until 
the block-flush gate in `flushToDiskIfRequired` fires. The gate's heap bound 
today is purely `hoodie.logfile.data.block.max.size` (`maxBlockSize`, default 
256 MB) — no per-task memory awareness.
   
   When operators bump `hoodie.logfile.data.block.max.size`, or when an 
executor is sized smaller than the configured block size (think tight tasks, 
many concurrent append handles per task on metadata writes, etc.), the 
in-memory buffer can exceed the actual per-task heap budget and OOM during 
HFile/Parquet serialization downstream.
   
   Concretely: on a Spark executor with 2 GB / 4 cores, every task gets ~120 MB 
user memory after `(1 - spark.memory.fraction)` and core division. But each 
`HoodieAppendHandle` is allowed to retain up to `maxBlockSize` (256 MB by 
default) worth of records — already 2× the per-task budget, before counting the 
concurrent handles, the serialization peak, or any merge/spillable-map 
allocations sharing that same per-task heap.
   
   PR #18843 (in flight) makes `averageRecordSize` accurate by sizing the 
*buffered* post-`prepareRecord` record rather than the incoming 
pre-`prepareRecord` one. With that fix, the gate finally fires at the right 
number of records — but it still fires at `maxBlockSize`, not at what the task 
can actually afford.
   
   ## To Reproduce
   
   Steps to reproduce the symptom on a small-executor Spark cluster:
   
   1. Configure a Spark job with tight executors (e.g., 
`spark.executor.memory=2g`, `spark.executor.cores=4`, 
`spark.memory.fraction=0.6` — yields ~120 MB user memory per task).
   2. Set `hoodie.logfile.data.block.max.size=512m` (or default 256m).
   3. Run a MOR upsert workload that writes wide records (e.g., metadata-table 
RLI writes with many fields, or any payload where the materialized Avro 
`IndexedRecord` is multi-KB per record).
   4. Observe OOM in `HoodieAppendHandle.flushToDiskIfRequired` or in the 
downstream `HoodieHFileDataBlock.serializeRecords` / `HoodieAvroDataBlock` 
serialization path.
   
   Note: PR #18843 alone is insufficient — it makes the gate fire at the right 
*number of records* for `maxBlockSize`, but the absolute byte budget is still 
too high for a small task.
   
   **Expected behavior**
   
   The append-handle buffer should be bounded by 
`min(hoodie.logfile.data.block.max.size, available-per-task-memory)`. The same 
engine-property infrastructure `IOUtils.getMaxMemoryAllowedForMerge` already 
uses for merge/compaction should drive the cap.
   
   **Environment Description**
   
   - Hudi version: master (1.x)
   - Spark version: any
   - Hadoop version: any
   - Storage: any
   - Running on Docker: not relevant
   
   ## Proposed change
   
   Introduce a dynamic per-task memory cap on the append-handle buffer, applied 
as a ceiling on top of `hoodie.logfile.data.block.max.size`:
   
   ```
   effectiveBlockSize = min(hoodie.logfile.data.block.max.size, dynamicCeiling)
   ```
   
   Where:
   
   ```
   dynamicCeiling = executorMemory * (1 - spark.memory.fraction) / task_slots
                      * hoodie.memory.logfile.append.fraction
   ```
   
   floored at a small lower bound (proposing 16 MB — smaller than the 100 MB 
spillable-map floor, because many append handles can be active concurrently in 
a single task and we want to tolerate small blocks on tight executors over OOM).
   
   ### New config
   
   - `hoodie.memory.logfile.append.fraction` (default `0.6`, advanced) — 
fraction of per-task user memory available to the append-handle buffer. Mirrors 
the existing `hoodie.memory.merge.fraction` and 
`hoodie.memory.compaction.fraction` defaults.
   
   ### Implementation sketch
   
   - Add a sibling helper 
`IOUtils.getMaxMemoryAllowedForLogAppend(taskContextSupplier, fraction, 
minFloor): Option<Long>` next to the existing `getMaxMemoryAllowedForMerge`. 
Distinct in two ways: returns `Option` (so callers can fall back to a static 
cap rather than the spillable-map 1GB default when engine properties are 
absent), and accepts the floor as a parameter (so the 100 MB spillable-map 
floor and a smaller log-append floor cannot collide). Honors 
`EngineProperty.SINGLE_TASK_CORES` like the existing helper.
   - In `HoodieAppendHandle`, compute `effectiveBlockSize = min(maxBlockSize, 
dynamicCeiling)` once in the constructor; swap the two `maxBlockSize` 
references in `flushToDiskIfRequired` to consult `effectiveBlockSize`.
   - When the engine does not expose memory/cores (Flink's 
`TaskContextSupplier.getProperty` returns `Option.empty()` unconditionally; 
Spark also returns empty if `SparkEnv` is absent), the helper returns 
`Option.empty()` and `effectiveBlockSize` collapses to `maxBlockSize` — no 
behavior change on those engines.
   
   ### Behavior
   
   - **Avro engine writes**: unaffected, since the heap pressure being 
addressed is most acute on Spark engines where `prepareRecord` materializes the 
Avro graph.
   - **Spark engine writes**: gate trips earlier on tight executors, buffer 
stays bounded by per-task heap, downstream serialization OOMs prevented.
   - **Flink**: unchanged.
   - `canWrite` / `estimatedNumberOfBytesWritten` are out of scope for this 
issue — those still use `maxBlockSize` indirectly to roll log-file groups. The 
proposal is intentionally scoped to the flush gate only.
   - Small Spark executors will see smaller, more numerous log blocks — 
intentional tradeoff vs. OOM. Read-path compaction merges blocks, so on-disk 
file count is unaffected.
   
   ### Stacking
   
   This change depends on PR #18843 to be useful — without that, 
`averageRecordSize` is still computed off the incoming (compact/deflated) 
record and the cap is moot. The PR for this issue will be stacked on top of 
#18843.
   
   A PR is in flight; will link here once raised.
   
   ## Stacktrace
   
   OOM symptom seen in production on the metadata RLI append path:
   
   ```
   java.lang.OutOfMemoryError: Java heap space
     at 
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(...)
     at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(...)
     at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlock(...)
     at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(...)
     at 
org.apache.hudi.io.HoodieAppendHandle.flushToDiskIfRequired(HoodieAppendHandle.java:...)
   ```
   
   Root cause: `recordList` grew well past one block's worth of heap before the 
gate fired, and the subsequent serialization couldn't fit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to