[ 
https://issues.apache.org/jira/browse/HUDI-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated HUDI-9331:
-----------------------------
    Fix Version/s: 1.1.0

> Incorrect memory estimation for CDC block flushing can lead to OOM
> ------------------------------------------------------------------
>
>                 Key: HUDI-9331
>                 URL: https://issues.apache.org/jira/browse/HUDI-9331
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: cdc
>            Reporter: Moran
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.1.0
>
>
> When writing CDC (Change Data Capture) log, Hudi accumulates records in 
> memory until the total estimated size reaches {{{}maxBlockSize{}}}. The flush 
> condition is based on the formula:
> {code:java}
> numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize {code}
> However, the value of averageCDCRecordSize is only estimated once, during the 
> first write of CDC data
> {code:java}
> if (cdcData.isEmpty()) {
>     averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
> } {code}
> This approach can lead to underestimation of memory usage. For instance, if 
> the first CDC record is relatively small but subsequent records are much 
> larger, the estimated average size remains inaccurately low. As a result, the 
> number of records in memory can grow far beyond what would actually fit in 
> maxBlockSize, potentially leading to OutOfMemoryError (OOM) before the flush 
> is triggered.
> {code:java}
> 25/04/16 12:58:49 ERROR BaseSparkCommitActionExecutor: Error upserting 
> bucketType UPDATE for partition :41843
> java.lang.OutOfMemoryError
>     at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>     at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>     at java.io.DataOutputStream.write(DataOutputStream.java:107)
>     at java.io.ByteArrayOutputStream.writeTo(ByteArrayOutputStream.java:167)
>     at 
> org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:129)
>     at 
> org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:128)
>     at 
> org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:154)
>     at 
> org.apache.hudi.io.HoodieCDCLogger.flushIfNeeded(HoodieCDCLogger.java:196)
>     at org.apache.hudi.io.HoodieCDCLogger.close(HoodieCDCLogger.java:229)
>     at 
> org.apache.hudi.io.HoodieMergeHandleWithChangeLog.close(HoodieMergeHandleWithChangeLog.java:123)
>     at 
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
>     at 
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
>     at 
> org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
>     at 
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
>     at 
> org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:147)
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351)
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:346)
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:312)
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:252)
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to