danny0405 commented on code in PR #18843:
URL: https://github.com/apache/hudi/pull/18843#discussion_r3303487238
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -672,17 +702,28 @@ private void bufferDelete(HoodieRecord<T> hoodieRecord) {
/**
* Checks if the number of records have reached the set threshold and then
flushes the records to disk.
+ *
+ * <p>{@code bufferedRecord} is the record that was just appended to {@link
#recordList} by
+ * {@link #writeToBuffer} (or {@code null} for delete/ignored windows where
{@code recordList}
+ * did not grow). Sizing this object — rather than the incoming pre-{@code
prepareRecord}
+ * record — keeps {@link #averageRecordSize} aligned with what is actually
retained in heap,
+ * which matters on Spark engines where the incoming {@code
HoodieSparkRecord}/{@code UnsafeRow}
+ * is many times smaller than the buffered {@code HoodieAvroIndexedRecord}.
*/
- protected void flushToDiskIfRequired(HoodieRecord record, boolean
appendDeleteBlocks) {
- if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)
- || numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE == 0) {
- averageRecordSize = (long) (averageRecordSize * 0.8 +
sizeEstimator.sizeEstimate(record) * 0.2);
+ protected void flushToDiskIfRequired(HoodieRecord bufferedRecord, boolean
appendDeleteBlocks) {
+ if (bufferedRecord != null
+ && (averageRecordSize == 0
+ || numberOfRecords >= (int) (maxBlockSize /
Math.max(averageRecordSize, 1))
+ || numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE ==
0)) {
+ long sampled = sizeEstimator.sizeEstimate(bufferedRecord);
+ averageRecordSize = averageRecordSize == 0
+ ? sampled
+ : (long) (averageRecordSize * 0.8 + sampled * 0.2);
}
- // Append if max number of records reached to achieve block size
- if (numberOfRecords >= (maxBlockSize / averageRecordSize)) {
- // Recompute averageRecordSize before writing a new block and update
existing value with
- // avg of new and old
+ // Append if max number of records reached to achieve block size.
+ // Skip when averageRecordSize is still 0 (delete-only prefix before any
insert/update).
+ if (averageRecordSize > 0 && numberOfRecords >= (maxBlockSize /
averageRecordSize)) {
Review Comment:
`averageRecordSize > 0` is this right? we should allow pure delete block to
flush? the delete record buffer also takes small amount of memory too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]