[
https://issues.apache.org/jira/browse/HUDI-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen updated HUDI-9331:
-----------------------------
Fix Version/s: 1.1.0
> Incorrect memory estimation for CDC block flushing can lead to OOM
> ------------------------------------------------------------------
>
> Key: HUDI-9331
> URL: https://issues.apache.org/jira/browse/HUDI-9331
> Project: Apache Hudi
> Issue Type: Bug
> Components: cdc
> Reporter: Moran
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.1.0
>
>
> When writing CDC (Change Data Capture) log, Hudi accumulates records in
> memory until the total estimated size reaches {{{}maxBlockSize{}}}. The flush
> condition is based on the formula:
> {code:java}
> numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize {code}
> However, the value of averageCDCRecordSize is only estimated once, during the
> first write of CDC data
> {code:java}
> if (cdcData.isEmpty()) {
> averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
> } {code}
> This approach can lead to underestimation of memory usage. For instance, if
> the first CDC record is relatively small but subsequent records are much
> larger, the estimated average size remains inaccurately low. As a result, the
> number of records in memory can grow far beyond what would actually fit in
> maxBlockSize, potentially leading to OutOfMemoryError (OOM) before the flush
> is triggered.
> {code:java}
> 25/04/16 12:58:49 ERROR BaseSparkCommitActionExecutor: Error upserting
> bucketType UPDATE for partition :41843
> java.lang.OutOfMemoryError
> at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at java.io.ByteArrayOutputStream.writeTo(ByteArrayOutputStream.java:167)
> at
> org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:129)
> at
> org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:128)
> at
> org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:154)
> at
> org.apache.hudi.io.HoodieCDCLogger.flushIfNeeded(HoodieCDCLogger.java:196)
> at org.apache.hudi.io.HoodieCDCLogger.close(HoodieCDCLogger.java:229)
> at
> org.apache.hudi.io.HoodieMergeHandleWithChangeLog.close(HoodieMergeHandleWithChangeLog.java:123)
> at
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
> at
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
> at
> org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
> at
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
> at
> org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:147)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:346)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:312)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:252)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)