[
https://issues.apache.org/jira/browse/HUDI-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Moran updated HUDI-9331:
------------------------
Description:
When writing CDC (Change Data Capture) log, Hudi accumulates records in memory
until the total estimated size reaches {{{}maxBlockSize{}}}. The flush
condition is based on the formula:
{code:java}
numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize {code}
However, the value of averageCDCRecordSize is only estimated once, during the
first write of CDC data
{code:java}
if (cdcData.isEmpty()) {
averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
} {code}
This approach can lead to underestimation of memory usage. For instance, if the
first CDC record is relatively small but subsequent records are much larger,
the estimated average size remains inaccurately low. As a result, the number of
records in memory can grow far beyond what would actually fit in maxBlockSize,
potentially leading to OutOfMemoryError (OOM) before the flush is triggered.
{code:java}
25/04/16 12:58:49 ERROR BaseSparkCommitActionExecutor: Error upserting
bucketType UPDATE for partition :41843
java.lang.OutOfMemoryError
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ByteArrayOutputStream.writeTo(ByteArrayOutputStream.java:167)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:129)
at
org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:128)
at
org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:154)
at
org.apache.hudi.io.HoodieCDCLogger.flushIfNeeded(HoodieCDCLogger.java:196)
at org.apache.hudi.io.HoodieCDCLogger.close(HoodieCDCLogger.java:229)
at
org.apache.hudi.io.HoodieMergeHandleWithChangeLog.close(HoodieMergeHandleWithChangeLog.java:123)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
at
org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:147)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:346)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:312)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:252)
{code}
was:
When writing CDC (Change Data Capture) log, Hudi accumulates records in memory
until the total estimated size reaches {{{}maxBlockSize{}}}. The flush
condition is based on the formula:
{code:java}
numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize {code}
However, the value of averageCDCRecordSize is only estimated once, during the
first write of CDC data
{code:java}
if (cdcData.isEmpty()) {
averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
} {code}
This approach can lead to underestimation of memory usage. For instance, if the
first CDC record is relatively small but subsequent records are much larger,
the estimated average size remains inaccurately low. As a result, the number of
records in memory can grow far beyond what would actually fit in maxBlockSize,
potentially leading to OutOfMemoryError (OOM) before the flush is triggered.
> Incorrect memory estimation for CDC block flushing can lead to OOM
> ------------------------------------------------------------------
>
> Key: HUDI-9331
> URL: https://issues.apache.org/jira/browse/HUDI-9331
> Project: Apache Hudi
> Issue Type: Bug
> Components: cdc
> Reporter: Moran
> Priority: Major
> Labels: pull-request-available
>
> When writing CDC (Change Data Capture) log, Hudi accumulates records in
> memory until the total estimated size reaches {{{}maxBlockSize{}}}. The flush
> condition is based on the formula:
> {code:java}
> numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize {code}
> However, the value of averageCDCRecordSize is only estimated once, during the
> first write of CDC data
> {code:java}
> if (cdcData.isEmpty()) {
> averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
> } {code}
> This approach can lead to underestimation of memory usage. For instance, if
> the first CDC record is relatively small but subsequent records are much
> larger, the estimated average size remains inaccurately low. As a result, the
> number of records in memory can grow far beyond what would actually fit in
> maxBlockSize, potentially leading to OutOfMemoryError (OOM) before the flush
> is triggered.
> {code:java}
> 25/04/16 12:58:49 ERROR BaseSparkCommitActionExecutor: Error upserting
> bucketType UPDATE for partition :41843
> java.lang.OutOfMemoryError
> at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at java.io.ByteArrayOutputStream.writeTo(ByteArrayOutputStream.java:167)
> at
> org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:129)
> at
> org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:128)
> at
> org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:154)
> at
> org.apache.hudi.io.HoodieCDCLogger.flushIfNeeded(HoodieCDCLogger.java:196)
> at org.apache.hudi.io.HoodieCDCLogger.close(HoodieCDCLogger.java:229)
> at
> org.apache.hudi.io.HoodieMergeHandleWithChangeLog.close(HoodieMergeHandleWithChangeLog.java:123)
> at
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
> at
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
> at
> org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
> at
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
> at
> org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:147)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:346)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:312)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:252)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)