This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f01113f0497 [HUDI-6892] ExternalSpillableMap may cause data
duplication when flink compaction (#9778)
f01113f0497 is described below
commit f01113f0497bc6ce92c536b1828debf928d42ad0
Author: llincc <[email protected]>
AuthorDate: Mon Oct 2 23:00:22 2023 +0800
[HUDI-6892] ExternalSpillableMap may cause data duplication when flink
compaction (#9778)
---
.../util/collection/ExternalSpillableMap.java | 4 ++
.../util/collection/TestExternalSpillableMap.java | 50 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index bbda80ea0a3..3d5fd1d5754 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -215,6 +215,10 @@ public class ExternalSpillableMap<T extends Serializable,
R extends Serializable
this.inMemoryMap.put(key, value);
} else if (this.currentInMemoryMapSize < this.maxInMemorySizeInBytes) {
this.currentInMemoryMapSize += this.estimatedPayloadSize;
+ // Remove the old version of the record from disk first to avoid data
duplication.
+ if (inDiskContainsKey(key)) {
+ getDiskBasedMap().remove(key);
+ }
this.inMemoryMap.put(key, value);
} else {
getDiskBasedMap().put(key, value);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
index 4cd34dbdab1..c3178709d1a 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -31,6 +32,7 @@ import org.apache.hudi.common.testutils.SpillableMapTestUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SizeEstimator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -47,6 +49,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -381,6 +384,53 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
});
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void
testDataCorrectnessWithRecordExistsInDiskMapAndThenUpsertToMem(ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isCompressionEnabled) throws IOException, URISyntaxException {
+ Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+ SizeEstimator keyEstimator = new DefaultSizeEstimator();
+ SizeEstimator valEstimator = new HoodieRecordSizeEstimator(schema);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
+
+ // Get the first record
+ IndexedRecord firstRecord = iRecords.get(0);
+ String key = ((GenericRecord)
firstRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String partitionPath = ((GenericRecord)
firstRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ HoodieRecord record =
+ new HoodieAvroRecord<>(new HoodieKey(key, partitionPath), new
HoodieAvroPayload(Option.of((GenericRecord) firstRecord)));
+ record.setCurrentLocation(new
HoodieRecordLocation(SpillableMapTestUtils.DUMMY_COMMIT_TIME,
SpillableMapTestUtils.DUMMY_FILE_ID));
+ record.seal();
+
+ // Estimate the first record size and calculate the total memory size that
the in-memory map can only contain 100 records.
+ long estimatedPayloadSize = keyEstimator.sizeEstimate(key) +
valEstimator.sizeEstimate(record);
+ long totalEstimatedSizeWith100Records = (long) ((estimatedPayloadSize *
100) / 0.8);
+ ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records =
+ new ExternalSpillableMap<>(totalEstimatedSizeWith100Records, basePath,
new DefaultSizeEstimator(),
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled);
+
+ // Insert 100 records and then in-memory map will contain 100 records.
+ SpillableMapTestUtils.upsertRecords(iRecords, records);
+
+ // Generate one record and it will be spilled to disk
+ List<IndexedRecord> singleRecord = testUtil.generateHoodieTestRecords(0,
1);
+ List<String> singleRecordKey =
SpillableMapTestUtils.upsertRecords(singleRecord, records);
+
+ // Get the field we want to update
+ String fieldName = schema.getFields().stream().filter(field ->
field.schema().getType() == Schema.Type.STRING).findAny()
+ .get().name();
+ HoodieRecord hoodieRecord = records.get(singleRecordKey.get(0));
+ // Use a new value to update this field, the estimate size of this record
will be less than the first record.
+ String newValue = "";
+ HoodieRecord updatedRecord =
+
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(Arrays.asList(hoodieRecord),
schema, fieldName, newValue).get(0);
+ records.put(updatedRecord.getRecordKey(), updatedRecord);
+
+ assertEquals(records.size(), 101);
+ }
+
private static Stream<Arguments> testArguments() {
// Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap
return Stream.of(