This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 927e5bf4ea [HUDI-4753] more accurate record size estimation for log
writing and spillable map (#6632)
927e5bf4ea is described below
commit 927e5bf4ea8c34c8f729eb404d27d2a32a593900
Author: Yuwei XIAO <[email protected]>
AuthorDate: Tue Oct 25 11:06:57 2022 +0800
[HUDI-4753] more accurate record size estimation for log writing and
spillable map (#6632)
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 9 +++++++--
.../util/collection/ExternalSpillableMap.java | 22 ++++++++--------------
2 files changed, 15 insertions(+), 16 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 8db927d569..418c221c1b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -84,6 +84,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends
private static final Logger LOG =
LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
private static final AtomicLong RECORD_COUNTER = new AtomicLong(1);
+ private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
protected final String fileId;
// Buffer for holding records in memory before they are flushed to disk
@@ -559,12 +560,16 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends
* Checks if the number of records have reached the set threshold and then
flushes the records to disk.
*/
private void flushToDiskIfRequired(HoodieRecord record, boolean
appendDeleteBlocks) {
+ if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)
+ || numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE == 0) {
+ averageRecordSize = (long) (averageRecordSize * 0.8 +
sizeEstimator.sizeEstimate(record) * 0.2);
+ }
+
// Append if max number of records reached to achieve block size
if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
// Recompute averageRecordSize before writing a new block and update
existing value with
// avg of new and old
- LOG.info("AvgRecordSize => " + averageRecordSize);
- averageRecordSize = (averageRecordSize +
sizeEstimator.sizeEstimate(record)) / 2;
+ LOG.info("Flush log block to disk, the current avgRecordSize => " +
averageRecordSize);
// Delete blocks will be appended after appending all the data blocks.
appendDataAndDeleteBlocks(header, appendDeleteBlocks);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 8d2707d604..218f0d9f16 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.util.collection;
-import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.exception.HoodieIOException;
@@ -78,8 +77,6 @@ public class ExternalSpillableMap<T extends Serializable, R
extends Serializable
private Long currentInMemoryMapSize;
// An estimate of the size of each payload written to this map
private volatile long estimatedPayloadSize = 0;
- // Flag to determine whether to stop re-estimating payload size
- private boolean shouldEstimatePayloadSize = true;
// Base File Path
private final String baseFilePath;
@@ -202,22 +199,19 @@ public class ExternalSpillableMap<T extends Serializable,
R extends Serializable
@Override
public R put(T key, R value) {
+ if (this.currentInMemoryMapSize >= maxInMemorySizeInBytes ||
inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
+ this.estimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9
+ + (keySizeEstimator.sizeEstimate(key) +
valueSizeEstimator.sizeEstimate(value)) * 0.1);
+ this.currentInMemoryMapSize = this.inMemoryMap.size() *
this.estimatedPayloadSize;
+ LOG.info("Update Estimated Payload size to => " +
this.estimatedPayloadSize);
+ }
+
if (this.currentInMemoryMapSize < maxInMemorySizeInBytes ||
inMemoryMap.containsKey(key)) {
- if (shouldEstimatePayloadSize && estimatedPayloadSize == 0) {
+ if (estimatedPayloadSize == 0) {
// At first, use the sizeEstimate of a record being inserted into the
spillable map.
// Note, the converter may over estimate the size of a record in the
JVM
this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) +
valueSizeEstimator.sizeEstimate(value);
LOG.info("Estimated Payload size => " + estimatedPayloadSize);
- } else if (shouldEstimatePayloadSize && !inMemoryMap.isEmpty()
- && (inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE
== 0)) {
- // Re-estimate the size of a record by calculating the size of the
entire map containing
- // N entries and then dividing by the number of entries present (N).
This helps to get a
- // correct estimation of the size of each record in the JVM.
- long totalMapSize = ObjectSizeCalculator.getObjectSize(inMemoryMap);
- this.currentInMemoryMapSize = totalMapSize;
- this.estimatedPayloadSize = totalMapSize / inMemoryMap.size();
- shouldEstimatePayloadSize = false;
- LOG.info("New Estimated Payload size => " + this.estimatedPayloadSize);
}
if (!inMemoryMap.containsKey(key)) {
// TODO : Add support for adjusting payloadSize for updates to the
same key