This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f01113f0497 [HUDI-6892] ExternalSpillableMap may cause data 
duplication when flink compaction (#9778)
f01113f0497 is described below

commit f01113f0497bc6ce92c536b1828debf928d42ad0
Author: llincc <[email protected]>
AuthorDate: Mon Oct 2 23:00:22 2023 +0800

    [HUDI-6892] ExternalSpillableMap may cause data duplication when flink 
compaction (#9778)
---
 .../util/collection/ExternalSpillableMap.java      |  4 ++
 .../util/collection/TestExternalSpillableMap.java  | 50 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index bbda80ea0a3..3d5fd1d5754 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -215,6 +215,10 @@ public class ExternalSpillableMap<T extends Serializable, 
R extends Serializable
       this.inMemoryMap.put(key, value);
     } else if (this.currentInMemoryMapSize < this.maxInMemorySizeInBytes) {
       this.currentInMemoryMapSize += this.estimatedPayloadSize;
+      // Remove the old version of the record from disk first to avoid data 
duplication.
+      if (inDiskContainsKey(key)) {
+        getDiskBasedMap().remove(key);
+      }
       this.inMemoryMap.put(key, value);
     } else {
       getDiskBasedMap().put(key, value);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
index 4cd34dbdab1..c3178709d1a 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -31,6 +32,7 @@ import org.apache.hudi.common.testutils.SpillableMapTestUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SizeEstimator;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -47,6 +49,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -381,6 +384,53 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
     });
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testDataCorrectnessWithRecordExistsInDiskMapAndThenUpsertToMem(ExternalSpillableMap.DiskMapType
 diskMapType,
+                                                  boolean 
isCompressionEnabled) throws IOException, URISyntaxException {
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    SizeEstimator keyEstimator = new DefaultSizeEstimator();
+    SizeEstimator valEstimator = new HoodieRecordSizeEstimator(schema);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
+
+    // Get the first record
+    IndexedRecord firstRecord = iRecords.get(0);
+    String key = ((GenericRecord) 
firstRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+    String partitionPath = ((GenericRecord) 
firstRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+    HoodieRecord record =
+        new HoodieAvroRecord<>(new HoodieKey(key, partitionPath), new 
HoodieAvroPayload(Option.of((GenericRecord) firstRecord)));
+    record.setCurrentLocation(new 
HoodieRecordLocation(SpillableMapTestUtils.DUMMY_COMMIT_TIME, 
SpillableMapTestUtils.DUMMY_FILE_ID));
+    record.seal();
+
+    // Estimate the first record size and calculate the total memory size that 
the in-memory map can only contain 100 records.
+    long estimatedPayloadSize = keyEstimator.sizeEstimate(key) + 
valEstimator.sizeEstimate(record);
+    long totalEstimatedSizeWith100Records = (long) ((estimatedPayloadSize * 
100) / 0.8);
+    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records =
+        new ExternalSpillableMap<>(totalEstimatedSizeWith100Records, basePath, 
new DefaultSizeEstimator(),
+            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled);
+
+    // Insert 100 records and then in-memory map will contain 100 records.
+    SpillableMapTestUtils.upsertRecords(iRecords, records);
+
+    // Generate one record and it will be spilled to disk
+    List<IndexedRecord> singleRecord = testUtil.generateHoodieTestRecords(0, 
1);
+    List<String> singleRecordKey = 
SpillableMapTestUtils.upsertRecords(singleRecord, records);
+
+    // Get the field we want to update
+    String fieldName = schema.getFields().stream().filter(field -> 
field.schema().getType() == Schema.Type.STRING).findAny()
+        .get().name();
+    HoodieRecord hoodieRecord = records.get(singleRecordKey.get(0));
+    // Use a new value to update this field, the estimate size of this record 
will be less than the first record.
+    String newValue = "";
+    HoodieRecord updatedRecord =
+        
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(Arrays.asList(hoodieRecord),
 schema, fieldName, newValue).get(0);
+    records.put(updatedRecord.getRecordKey(), updatedRecord);
+
+    assertEquals(records.size(), 101);
+  }
+
   private static Stream<Arguments> testArguments() {
     // Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap
     return Stream.of(

Reply via email to