This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b3d0fa4ffe [HUDI-7238] Bug fixes and optimization of 
ExternalSpillableMap (#10344)
2b3d0fa4ffe is described below

commit 2b3d0fa4ffe5067bbef5025c5a6fcc257e26fb0f
Author: Tim Brown <[email protected]>
AuthorDate: Fri Jan 26 21:07:40 2024 -0600

    [HUDI-7238] Bug fixes and optimization of ExternalSpillableMap (#10344)
---
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   |   2 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   5 +-
 .../view/SpillableMapBasedFileSystemView.java      |  23 +
 .../common/util/collection/BitCaskDiskMap.java     |   5 +-
 .../hudi/common/util/collection/DiskMap.java       |   5 +-
 .../util/collection/ExternalSpillableMap.java      |  95 ++--
 .../util/collection/TestExternalSpillableMap.java  | 522 +++++++++++----------
 .../realtime/HoodieMergeOnReadSnapshotReader.java  |   3 +
 8 files changed, 370 insertions(+), 290 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index 1e2fa7c59e4..9c78c5aae7c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -235,7 +235,7 @@ public class HoodieCDCLogger implements Closeable {
       throw new HoodieIOException("Failed to close HoodieCDCLogger", e);
     } finally {
       // in case that crash when call `flushIfNeeded`, do the cleanup again.
-      cdcData.clear();
+      cdcData.close();
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 79c80e1a5af..ae1f1dd1da4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -426,8 +427,8 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
       markClosed();
       writeIncomingRecords();
 
-      if (keyToNewRecords instanceof ExternalSpillableMap) {
-        ((ExternalSpillableMap) keyToNewRecords).close();
+      if (keyToNewRecords instanceof Closeable) {
+        ((Closeable) keyToNewRecords).close();
       }
 
       keyToNewRecords = null;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
index 75d29870a5a..95ba9dfd8f6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
@@ -216,4 +216,27 @@ public class SpillableMapBasedFileSystemView extends 
HoodieTableFileSystemView {
 
     fileIdsToRemove.forEach(fileGroupId -> 
fgIdToReplaceInstants.remove(fileGroupId));
   }
+
+  @Override
+  public void close() {
+    super.close();
+    if (partitionToFileGroupsMap != null) {
+      ((ExternalSpillableMap) partitionToFileGroupsMap).close();
+    }
+    if (fgIdToPendingClustering != null) {
+      ((ExternalSpillableMap) fgIdToPendingClustering).close();
+    }
+    if (fgIdToPendingCompaction != null) {
+      ((ExternalSpillableMap) fgIdToPendingCompaction).close();
+    }
+    if (fgIdToPendingLogCompaction != null) {
+      ((ExternalSpillableMap) fgIdToPendingLogCompaction).close();
+    }
+    if (fgIdToBootstrapBaseFile != null) {
+      ((ExternalSpillableMap) fgIdToBootstrapBaseFile).close();
+    }
+    if (fgIdToReplaceInstants != null) {
+      ((ExternalSpillableMap) fgIdToReplaceInstants).close();
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
index 56787b0c3d1..2f1595bfe1d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
@@ -218,8 +218,8 @@ public final class BitCaskDiskMap<T extends Serializable, R 
extends Serializable
     try {
       byte[] val = isCompressionEnabled ? 
DISK_COMPRESSION_REF.get().compressBytes(SerializationUtils.serialize(value)) :
           SerializationUtils.serialize(value);
-      Integer valueSize = val.length;
-      Long timestamp = System.currentTimeMillis();
+      int valueSize = val.length;
+      long timestamp = System.currentTimeMillis();
       this.valueMetadataMap.put(key,
           new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, 
filePosition.get(), timestamp));
       byte[] serializedKey = SerializationUtils.serialize(key);
@@ -271,6 +271,7 @@ public final class BitCaskDiskMap<T extends Serializable, R 
extends Serializable
         fileOutputStream.getChannel().force(false);
         writeOnlyFileHandle.close();
       }
+      fileOutputStream.close();
 
       while (!openedAccessFiles.isEmpty()) {
         BufferedRandomAccessFile file = openedAccessFiles.poll();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
index 31a34148592..c8d57aec032 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
@@ -63,7 +63,10 @@ public abstract class DiskMap<T extends Serializable, R 
extends Serializable> im
    * (typically 4 KB) to disk.
    */
   private void addShutDownHook() {
-    shutdownThread = new Thread(this::cleanup);
+    shutdownThread = new Thread(() -> {
+      LOG.warn("Failed to properly close DiskMap in application");
+      cleanup();
+    });
     Runtime.getRuntime().addShutdownHook(shutdownThread);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 3d5fd1d5754..5df8f97d4b9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -54,7 +55,7 @@ import java.util.stream.Stream;
  * frequently and incur unnecessary disk writes.
  */
 @NotThreadSafe
-public class ExternalSpillableMap<T extends Serializable, R extends 
Serializable> implements Map<T, R>, Serializable {
+public class ExternalSpillableMap<T extends Serializable, R extends 
Serializable> implements Map<T, R>, Serializable, Closeable {
 
   // Find the actual estimated payload size after inserting N records
   private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
@@ -67,7 +68,7 @@ public class ExternalSpillableMap<T extends Serializable, R 
extends Serializable
   private transient volatile DiskMap<T, R> diskBasedMap;
   // TODO(na) : a dynamic sizing factor to ensure we have space for other 
objects in memory and
   // incorrect payload estimation
-  private final Double sizingFactorForInMemoryMap = 0.8;
+  private static final double SIZING_FACTOR_FOR_IN_MEMORY_MAP = 0.8;
   // Size Estimator for key type
   private final SizeEstimator<T> keySizeEstimator;
   // Size Estimator for key types
@@ -77,27 +78,27 @@ public class ExternalSpillableMap<T extends Serializable, R 
extends Serializable
   // Enables compression of values stored in disc
   private final boolean isCompressionEnabled;
   // current space occupied by this map in-memory
-  private Long currentInMemoryMapSize;
+  private long currentInMemoryMapSize;
   // An estimate of the size of each payload written to this map
   private volatile long estimatedPayloadSize = 0;
   // Base File Path
   private final String baseFilePath;
 
-  public ExternalSpillableMap(Long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator<T> keySizeEstimator,
+  public ExternalSpillableMap(long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator<T> keySizeEstimator,
                               SizeEstimator<R> valueSizeEstimator) throws 
IOException {
     this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, 
valueSizeEstimator, DiskMapType.BITCASK);
   }
 
-  public ExternalSpillableMap(Long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator<T> keySizeEstimator,
+  public ExternalSpillableMap(long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator<T> keySizeEstimator,
                               SizeEstimator<R> valueSizeEstimator, DiskMapType 
diskMapType) throws IOException {
     this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, 
valueSizeEstimator, diskMapType, false);
   }
 
-  public ExternalSpillableMap(Long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator<T> keySizeEstimator,
+  public ExternalSpillableMap(long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator<T> keySizeEstimator,
                               SizeEstimator<R> valueSizeEstimator, DiskMapType 
diskMapType, boolean isCompressionEnabled) throws IOException {
     this.inMemoryMap = new HashMap<>();
     this.baseFilePath = baseFilePath;
-    this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * 
sizingFactorForInMemoryMap);
+    this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * 
SIZING_FACTOR_FOR_IN_MEMORY_MAP);
     this.currentInMemoryMapSize = 0L;
     this.keySizeEstimator = keySizeEstimator;
     this.valueSizeEstimator = valueSizeEstimator;
@@ -105,7 +106,7 @@ public class ExternalSpillableMap<T extends Serializable, R 
extends Serializable
     this.isCompressionEnabled = isCompressionEnabled;
   }
 
-  private DiskMap<T, R> getDiskBasedMap() {
+  private void initDiskBasedMap() {
     if (null == diskBasedMap) {
       synchronized (this) {
         if (null == diskBasedMap) {
@@ -116,7 +117,7 @@ public class ExternalSpillableMap<T extends Serializable, R 
extends Serializable
                 break;
               case BITCASK:
               default:
-                diskBasedMap =  new BitCaskDiskMap<>(baseFilePath, 
isCompressionEnabled);
+                diskBasedMap = new BitCaskDiskMap<>(baseFilePath, 
isCompressionEnabled);
             }
           } catch (IOException e) {
             throw new HoodieIOException(e.getMessage(), e);
@@ -124,28 +125,28 @@ public class ExternalSpillableMap<T extends Serializable, 
R extends Serializable
         }
       }
     }
-    return diskBasedMap;
   }
 
   /**
    * A custom iterator to wrap over iterating in-memory + disk spilled data.
    */
   public Iterator<R> iterator() {
-    return new IteratorWrapper<>(inMemoryMap.values().iterator(), 
getDiskBasedMap().iterator());
+
+    return diskBasedMap == null ? inMemoryMap.values().iterator() : new 
IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator());
   }
 
   /**
    * Number of entries in BitCaskDiskMap.
    */
   public int getDiskBasedMapNumEntries() {
-    return getDiskBasedMap().size();
+    return diskBasedMap == null ? 0 : diskBasedMap.size();
   }
 
   /**
    * Number of bytes spilled to disk.
    */
   public long getSizeOfFileOnDiskInBytes() {
-    return getDiskBasedMap().sizeOfFileOnDiskInBytes();
+    return diskBasedMap == null ? 0 : diskBasedMap.sizeOfFileOnDiskInBytes();
   }
 
   /**
@@ -164,38 +165,38 @@ public class ExternalSpillableMap<T extends Serializable, 
R extends Serializable
 
   @Override
   public int size() {
-    return inMemoryMap.size() + getDiskBasedMap().size();
+    return inMemoryMap.size() + getDiskBasedMapNumEntries();
   }
 
   @Override
   public boolean isEmpty() {
-    return inMemoryMap.isEmpty() && getDiskBasedMap().isEmpty();
+    return inMemoryMap.isEmpty() && getDiskBasedMapNumEntries() == 0;
   }
 
   @Override
   public boolean containsKey(Object key) {
-    return inMemoryMap.containsKey(key) || getDiskBasedMap().containsKey(key);
+    return inMemoryMap.containsKey(key) || inDiskContainsKey(key);
   }
 
   @Override
   public boolean containsValue(Object value) {
-    return inMemoryMap.containsValue(value) || 
getDiskBasedMap().containsValue(value);
+    return inMemoryMap.containsValue(value) || (diskBasedMap != null && 
diskBasedMap.containsValue(value));
   }
 
-  public boolean inMemoryContainsKey(Object key) {
+  private boolean inMemoryContainsKey(Object key) {
     return inMemoryMap.containsKey(key);
   }
 
-  public boolean inDiskContainsKey(Object key) {
-    return getDiskBasedMap().containsKey(key);
+  private boolean inDiskContainsKey(Object key) {
+    return diskBasedMap != null && diskBasedMap.containsKey(key);
   }
 
   @Override
   public R get(Object key) {
     if (inMemoryMap.containsKey(key)) {
       return inMemoryMap.get(key);
-    } else if (getDiskBasedMap().containsKey(key)) {
-      return getDiskBasedMap().get(key);
+    } else if (inDiskContainsKey(key)) {
+      return diskBasedMap.get(key);
     }
     return null;
   }
@@ -217,11 +218,14 @@ public class ExternalSpillableMap<T extends Serializable, 
R extends Serializable
       this.currentInMemoryMapSize += this.estimatedPayloadSize;
       // Remove the old version of the record from disk first to avoid data 
duplication.
       if (inDiskContainsKey(key)) {
-        getDiskBasedMap().remove(key);
+        diskBasedMap.remove(key);
       }
       this.inMemoryMap.put(key, value);
     } else {
-      getDiskBasedMap().put(key, value);
+      if (diskBasedMap == null) {
+        initDiskBasedMap();
+      }
+      diskBasedMap.put(key, value);
     }
     return value;
   }
@@ -232,8 +236,8 @@ public class ExternalSpillableMap<T extends Serializable, R 
extends Serializable
     if (inMemoryMap.containsKey(key)) {
       currentInMemoryMapSize -= estimatedPayloadSize;
       return inMemoryMap.remove(key);
-    } else if (getDiskBasedMap().containsKey(key)) {
-      return getDiskBasedMap().remove(key);
+    } else if (inDiskContainsKey(key)) {
+      return diskBasedMap.remove(key);
     }
     return null;
   }
@@ -248,43 +252,60 @@ public class ExternalSpillableMap<T extends Serializable, 
R extends Serializable
   @Override
   public void clear() {
     inMemoryMap.clear();
-    getDiskBasedMap().clear();
+    if (diskBasedMap != null) {
+      diskBasedMap.clear();
+    }
     currentInMemoryMapSize = 0L;
   }
 
   public void close() {
     inMemoryMap.clear();
-    getDiskBasedMap().close();
+    if (diskBasedMap != null) {
+      diskBasedMap.close();
+    }
     currentInMemoryMapSize = 0L;
   }
 
   @Override
   public Set<T> keySet() {
-    Set<T> keySet = new HashSet<T>();
+    if (diskBasedMap == null) {
+      return inMemoryMap.keySet();
+    }
+    Set<T> keySet = new HashSet<>(inMemoryMap.size() + diskBasedMap.size());
     keySet.addAll(inMemoryMap.keySet());
-    keySet.addAll(getDiskBasedMap().keySet());
+    keySet.addAll(diskBasedMap.keySet());
     return keySet;
   }
 
   @Override
   public Collection<R> values() {
-    if (getDiskBasedMap().isEmpty()) {
+    if (diskBasedMap == null) {
       return inMemoryMap.values();
     }
-    List<R> result = new ArrayList<>(inMemoryMap.values());
-    result.addAll(getDiskBasedMap().values());
+    List<R> result = new ArrayList<>(inMemoryMap.size() + diskBasedMap.size());
+    result.addAll(inMemoryMap.values());
+    result.addAll(diskBasedMap.values());
     return result;
   }
 
   public Stream<R> valueStream() {
-    return Stream.concat(inMemoryMap.values().stream(), 
getDiskBasedMap().valueStream());
+    if (diskBasedMap == null) {
+      return inMemoryMap.values().stream();
+    }
+    return Stream.concat(inMemoryMap.values().stream(), 
diskBasedMap.valueStream());
   }
 
   @Override
   public Set<Entry<T, R>> entrySet() {
-    Set<Entry<T, R>> entrySet = new HashSet<>();
-    entrySet.addAll(inMemoryMap.entrySet());
-    entrySet.addAll(getDiskBasedMap().entrySet());
+    if (diskBasedMap == null) {
+      return inMemoryMap.entrySet();
+    }
+    Set<Entry<T, R>> inMemory = inMemoryMap.entrySet();
+    Set<Entry<T, R>> onDisk = diskBasedMap.entrySet();
+
+    Set<Entry<T, R>> entrySet = new HashSet<>(inMemory.size() + onDisk.size());
+    entrySet.addAll(inMemory);
+    entrySet.addAll(onDisk);
     return entrySet;
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
index 2b651ecfd9c..ed64ee00287 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
@@ -43,6 +43,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
@@ -81,33 +82,34 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
   public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType, 
boolean isCompressionEnabled) throws IOException, URISyntaxException {
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
 
-    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records =
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records =
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
-            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled); // 16B
+            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled)) { // 16B
 
-    SchemaTestUtil testUtil = new SchemaTestUtil();
-    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
-    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
-    assert (recordKeys.size() == 100);
-
-    // Test iterator
-    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = 
records.iterator();
-    int cntSize = 0;
-    while (itr.hasNext()) {
-      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
-      cntSize++;
-      assert recordKeys.contains(rec.getRecordKey());
-    }
-    assertEquals(recordKeys.size(), cntSize);
-
-    // Test value stream
-    List<HoodieRecord<? extends HoodieRecordPayload>> values = 
records.valueStream().collect(Collectors.toList());
-    cntSize = 0;
-    for (HoodieRecord value : values) {
-      assert recordKeys.contains(value.getRecordKey());
-      cntSize++;
+      SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 
100);
+      List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
+      assert (recordKeys.size() == 100);
+
+      // Test iterator
+      Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = 
records.iterator();
+      int cntSize = 0;
+      while (itr.hasNext()) {
+        HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+        cntSize++;
+        assert recordKeys.contains(rec.getRecordKey());
+      }
+      assertEquals(recordKeys.size(), cntSize);
+
+      // Test value stream
+      List<HoodieRecord<? extends HoodieRecordPayload>> values = 
records.valueStream().collect(Collectors.toList());
+      cntSize = 0;
+      for (HoodieRecord value : values) {
+        assert recordKeys.contains(value.getRecordKey());
+        cntSize++;
+      }
+      assertEquals(recordKeys.size(), cntSize);
     }
-    assertEquals(recordKeys.size(), cntSize);
   }
 
   @ParameterizedTest
@@ -115,38 +117,39 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
   public void testSimpleUpsert(ExternalSpillableMap.DiskMapType diskMapType, 
boolean isCompressionEnabled) throws IOException, URISyntaxException {
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
 
-    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records =
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records =
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
-            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled); // 16B
+            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled)) { // 16B
 
-    SchemaTestUtil testUtil = new SchemaTestUtil();
-    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
-    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
-    assert (recordKeys.size() == 100);
-    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = 
records.iterator();
-    while (itr.hasNext()) {
-      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
-      assert recordKeys.contains(rec.getRecordKey());
-    }
+      SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 
100);
+      List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
+      assert (recordKeys.size() == 100);
+      Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = 
records.iterator();
+      while (itr.hasNext()) {
+        HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+        assert recordKeys.contains(rec.getRecordKey());
+      }
 
-    List<IndexedRecord> updatedRecords = 
SchemaTestUtil.updateHoodieTestRecords(recordKeys,
-        testUtil.generateHoodieTestRecords(0, 100), 
InProcessTimeGenerator.createNewInstantTime());
+      List<IndexedRecord> updatedRecords = 
SchemaTestUtil.updateHoodieTestRecords(recordKeys,
+          testUtil.generateHoodieTestRecords(0, 100), 
InProcessTimeGenerator.createNewInstantTime());
 
-    // update records already inserted
-    SpillableMapTestUtils.upsertRecords(updatedRecords, records);
+      // update records already inserted
+      SpillableMapTestUtils.upsertRecords(updatedRecords, records);
 
-    // make sure we have records spilled to disk
-    assertTrue(records.getDiskBasedMapNumEntries() > 0);
+      // make sure we have records spilled to disk
+      assertTrue(records.getDiskBasedMapNumEntries() > 0);
 
-    // iterate over the updated records and compare the value from Map
-    updatedRecords.forEach(record -> {
-      HoodieRecord rec = records.get(((GenericRecord) 
record).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
-      try {
-        assertEquals(((HoodieAvroRecord) 
rec).getData().getInsertValue(schema).get(), record);
-      } catch (IOException io) {
-        throw new UncheckedIOException(io);
-      }
-    });
+      // iterate over the updated records and compare the value from Map
+      updatedRecords.forEach(record -> {
+        HoodieRecord rec = records.get(((GenericRecord) 
record).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+        try {
+          assertEquals(((HoodieAvroRecord) 
rec).getData().getInsertValue(schema).get(), record);
+        } catch (IOException io) {
+          throw new UncheckedIOException(io);
+        }
+      });
+    }
   }
 
   @ParameterizedTest
@@ -156,56 +159,57 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
     String payloadClazz = HoodieAvroPayload.class.getName();
 
-    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records =
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records =
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
-            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled); // 16B
+            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled)) { // 16B
 
-    SchemaTestUtil testUtil = new SchemaTestUtil();
-    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
-    // insert a bunch of records so that values spill to disk too
-    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
-    IndexedRecord inMemoryRecord = iRecords.get(0);
-    String ikey = ((GenericRecord) 
inMemoryRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-    String iPartitionPath = ((GenericRecord) 
inMemoryRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
-    HoodieRecord inMemoryHoodieRecord = new HoodieAvroRecord<>(new 
HoodieKey(ikey, iPartitionPath),
-        new HoodieAvroPayload(Option.of((GenericRecord) inMemoryRecord)));
-
-    IndexedRecord onDiskRecord = iRecords.get(99);
-    String dkey = ((GenericRecord) 
onDiskRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-    String dPartitionPath = ((GenericRecord) 
onDiskRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
-    HoodieRecord onDiskHoodieRecord = new HoodieAvroRecord<>(new 
HoodieKey(dkey, dPartitionPath),
-        new HoodieAvroPayload(Option.of((GenericRecord) onDiskRecord)));
-    // assert size
-    assert records.size() == 100;
-    // get should return the same HoodieKey, same location and same value
-    assert inMemoryHoodieRecord.getKey().equals(records.get(ikey).getKey());
-    assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey());
-    // compare the member variables of HoodieRecord not set by the constructor
-    assert 
records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID);
-    assert 
records.get(ikey).getCurrentLocation().getInstantTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
-
-    // test contains
-    assertTrue(records.containsKey(ikey));
-    assertTrue(records.containsKey(dkey));
-
-    // test isEmpty
-    assertFalse(records.isEmpty());
-
-    // test containsAll
-    assertTrue(records.keySet().containsAll(recordKeys));
-
-    // remove (from inMemory and onDisk)
-    HoodieRecord removedRecord = records.remove(ikey);
-    assertTrue(removedRecord != null);
-    assertFalse(records.containsKey(ikey));
-
-    removedRecord = records.remove(dkey);
-    assertTrue(removedRecord != null);
-    assertFalse(records.containsKey(dkey));
-
-    // test clear
-    records.clear();
-    assertTrue(records.size() == 0);
+      SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 
100);
+      // insert a bunch of records so that values spill to disk too
+      List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
+      IndexedRecord inMemoryRecord = iRecords.get(0);
+      String ikey = ((GenericRecord) 
inMemoryRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+      String iPartitionPath = ((GenericRecord) 
inMemoryRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+      HoodieRecord inMemoryHoodieRecord = new HoodieAvroRecord<>(new 
HoodieKey(ikey, iPartitionPath),
+          new HoodieAvroPayload(Option.of((GenericRecord) inMemoryRecord)));
+
+      IndexedRecord onDiskRecord = iRecords.get(99);
+      String dkey = ((GenericRecord) 
onDiskRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+      String dPartitionPath = ((GenericRecord) 
onDiskRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+      HoodieRecord onDiskHoodieRecord = new HoodieAvroRecord<>(new 
HoodieKey(dkey, dPartitionPath),
+          new HoodieAvroPayload(Option.of((GenericRecord) onDiskRecord)));
+      // assert size
+      assert records.size() == 100;
+      // get should return the same HoodieKey, same location and same value
+      assert inMemoryHoodieRecord.getKey().equals(records.get(ikey).getKey());
+      assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey());
+      // compare the member variables of HoodieRecord not set by the 
constructor
+      assert 
records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID);
+      assert 
records.get(ikey).getCurrentLocation().getInstantTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
+
+      // test contains
+      assertTrue(records.containsKey(ikey));
+      assertTrue(records.containsKey(dkey));
+
+      // test isEmpty
+      assertFalse(records.isEmpty());
+
+      // test containsAll
+      assertTrue(records.keySet().containsAll(recordKeys));
+
+      // remove (from inMemory and onDisk)
+      HoodieRecord removedRecord = records.remove(ikey);
+      assertTrue(removedRecord != null);
+      assertFalse(records.containsKey(ikey));
+
+      removedRecord = records.remove(dkey);
+      assertTrue(removedRecord != null);
+      assertFalse(records.containsKey(dkey));
+
+      // test clear
+      records.clear();
+      assertTrue(records.size() == 0);
+    }
   }
 
   @ParameterizedTest
@@ -213,20 +217,21 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
   public void simpleTestWithException(ExternalSpillableMap.DiskMapType 
diskMapType, boolean isCompressionEnabled) throws IOException, 
URISyntaxException {
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
 
-    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records = new ExternalSpillableMap<>(16L,
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records = new ExternalSpillableMap<>(16L,
         failureOutputPath, new DefaultSizeEstimator(),
-        new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled); // 16B
+        new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled)) { // 16B
 
-    SchemaTestUtil testUtil = new SchemaTestUtil();
-    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
-    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
-    assert (recordKeys.size() == 100);
-    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = 
records.iterator();
-    assertThrows(IOException.class, () -> {
-      while (itr.hasNext()) {
-        throw new IOException("Testing failures...");
-      }
-    });
+      SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 
100);
+      List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
+      assert (recordKeys.size() == 100);
+      Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = 
records.iterator();
+      assertThrows(IOException.class, () -> {
+        while (itr.hasNext()) {
+          throw new IOException("Testing failures...");
+        }
+      });
+    }
   }
 
   @ParameterizedTest
@@ -237,52 +242,53 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
 
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
 
-    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records =
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records =
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
-            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled); // 16B
+            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled)) { // 16B
+
+      List<String> recordKeys = new ArrayList<>();
+      // Ensure we spill to disk
+      while (records.getDiskBasedMapNumEntries() < 1) {
+        SchemaTestUtil testUtil = new SchemaTestUtil();
+        List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 
100);
+        recordKeys.addAll(SpillableMapTestUtils.upsertRecords(iRecords, 
records));
+      }
 
-    List<String> recordKeys = new ArrayList<>();
-    // Ensure we spill to disk
-    while (records.getDiskBasedMapNumEntries() < 1) {
-      SchemaTestUtil testUtil = new SchemaTestUtil();
-      List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 
100);
-      recordKeys.addAll(SpillableMapTestUtils.upsertRecords(iRecords, 
records));
+      // Get a record from the in-Memory map
+      String key = recordKeys.get(0);
+      HoodieAvroRecord record = (HoodieAvroRecord) records.get(key);
+      List<IndexedRecord> recordsToUpdate = new ArrayList<>();
+      recordsToUpdate.add((IndexedRecord) 
record.getData().getInsertValue(schema).get());
+
+      String newCommitTime = InProcessTimeGenerator.createNewInstantTime();
+      List<String> keysToBeUpdated = new ArrayList<>();
+      keysToBeUpdated.add(key);
+      // Update the instantTime for this record
+      List<IndexedRecord> updatedRecords =
+          SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated, 
recordsToUpdate, newCommitTime);
+      // Upsert this updated record
+      SpillableMapTestUtils.upsertRecords(updatedRecords, records);
+      GenericRecord gRecord = (GenericRecord) 
records.get(key).getData().getInsertValue(schema).get();
+      // The record returned for this key should have the updated commitTime
+      assert 
newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
+
+      // Get a record from the disk based map
+      key = recordKeys.get(recordKeys.size() - 1);
+      record = (HoodieAvroRecord) records.get(key);
+      recordsToUpdate = new ArrayList<>();
+      recordsToUpdate.add((IndexedRecord) 
record.getData().getInsertValue(schema).get());
+
+      newCommitTime = InProcessTimeGenerator.createNewInstantTime();
+      keysToBeUpdated = new ArrayList<>();
+      keysToBeUpdated.add(key);
+      // Update the commitTime for this record
+      updatedRecords = SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated, 
recordsToUpdate, newCommitTime);
+      // Upsert this updated record
+      SpillableMapTestUtils.upsertRecords(updatedRecords, records);
+      gRecord = (GenericRecord) 
records.get(key).getData().getInsertValue(schema).get();
+      // The record returned for this key should have the updated instantTime
+      assert 
newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
     }
-
-    // Get a record from the in-Memory map
-    String key = recordKeys.get(0);
-    HoodieAvroRecord record = (HoodieAvroRecord) records.get(key);
-    List<IndexedRecord> recordsToUpdate = new ArrayList<>();
-    recordsToUpdate.add((IndexedRecord) 
record.getData().getInsertValue(schema).get());
-
-    String newCommitTime = InProcessTimeGenerator.createNewInstantTime();
-    List<String> keysToBeUpdated = new ArrayList<>();
-    keysToBeUpdated.add(key);
-    // Update the instantTime for this record
-    List<IndexedRecord> updatedRecords =
-        SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated, 
recordsToUpdate, newCommitTime);
-    // Upsert this updated record
-    SpillableMapTestUtils.upsertRecords(updatedRecords, records);
-    GenericRecord gRecord = (GenericRecord) 
records.get(key).getData().getInsertValue(schema).get();
-    // The record returned for this key should have the updated commitTime
-    assert 
newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
-
-    // Get a record from the disk based map
-    key = recordKeys.get(recordKeys.size() - 1);
-    record = (HoodieAvroRecord) records.get(key);
-    recordsToUpdate = new ArrayList<>();
-    recordsToUpdate.add((IndexedRecord) 
record.getData().getInsertValue(schema).get());
-
-    newCommitTime = InProcessTimeGenerator.createNewInstantTime();
-    keysToBeUpdated = new ArrayList<>();
-    keysToBeUpdated.add(key);
-    // Update the commitTime for this record
-    updatedRecords = SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated, 
recordsToUpdate, newCommitTime);
-    // Upsert this updated record
-    SpillableMapTestUtils.upsertRecords(updatedRecords, records);
-    gRecord = (GenericRecord) 
records.get(key).getData().getInsertValue(schema).get();
-    // The record returned for this key should have the updated instantTime
-    assert 
newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
   }
 
   @ParameterizedTest
@@ -293,65 +299,66 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
 
     Schema schema = SchemaTestUtil.getSimpleSchema();
 
-    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records =
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records =
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
-            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled); // 16B
+            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled)) { // 16B
 
-    SchemaTestUtil testUtil = new SchemaTestUtil();
-    List<String> recordKeys = new ArrayList<>();
+      SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<String> recordKeys = new ArrayList<>();
+
+      // Ensure we spill to disk
+      while (records.getDiskBasedMapNumEntries() < 1) {
+        List<HoodieRecord> hoodieRecords = 
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
+        hoodieRecords.stream().forEach(r -> {
+          records.put(r.getRecordKey(), r);
+          recordKeys.add(r.getRecordKey());
+        });
+      }
 
-    // Ensure we spill to disk
-    while (records.getDiskBasedMapNumEntries() < 1) {
-      List<HoodieRecord> hoodieRecords = 
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
-      hoodieRecords.stream().forEach(r -> {
+      // Get a record from the in-Memory map
+      String key = recordKeys.get(0);
+      HoodieRecord record = records.get(key);
+      // Get the field we want to update
+      String fieldName = schema.getFields().stream().filter(field -> 
field.schema().getType() == Schema.Type.STRING)
+          .findAny().get().name();
+      // Use a new value to update this field
+      String newValue = "update1";
+      List<HoodieRecord> recordsToUpdate = new ArrayList<>();
+      recordsToUpdate.add(record);
+
+      List<HoodieRecord> updatedRecords =
+          
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate, 
schema, fieldName, newValue);
+
+      // Upsert this updated record
+      updatedRecords.forEach(r -> {
         records.put(r.getRecordKey(), r);
-        recordKeys.add(r.getRecordKey());
       });
+      GenericRecord gRecord = (GenericRecord) 
records.get(key).getData().getInsertValue(schema).get();
+      // The record returned for this key should have the updated value for 
the field name
+      assertEquals(gRecord.get(fieldName).toString(), newValue);
+
+      // Get a record from the disk based map
+      key = recordKeys.get(recordKeys.size() - 1);
+      record = records.get(key);
+      // Get the field we want to update
+      fieldName = schema.getFields().stream().filter(field -> 
field.schema().getType() == Schema.Type.STRING).findAny()
+          .get().name();
+      // Use a new value to update this field
+      newValue = "update2";
+      recordsToUpdate = new ArrayList<>();
+      recordsToUpdate.add(record);
+
+      updatedRecords =
+          
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate, 
schema, fieldName, newValue);
+
+      // Upsert this updated record
+      updatedRecords.forEach(r -> {
+        records.put(r.getRecordKey(), r);
+      });
+      gRecord = (GenericRecord) 
records.get(key).getData().getInsertValue(schema).get();
+      // The record returned for this key should have the updated value for 
the field name
+      assertEquals(gRecord.get(fieldName).toString(), newValue);
     }
-
-    // Get a record from the in-Memory map
-    String key = recordKeys.get(0);
-    HoodieRecord record = records.get(key);
-    // Get the field we want to update
-    String fieldName = schema.getFields().stream().filter(field -> 
field.schema().getType() == Schema.Type.STRING)
-        .findAny().get().name();
-    // Use a new value to update this field
-    String newValue = "update1";
-    List<HoodieRecord> recordsToUpdate = new ArrayList<>();
-    recordsToUpdate.add(record);
-
-    List<HoodieRecord> updatedRecords =
-        
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate, 
schema, fieldName, newValue);
-
-    // Upsert this updated record
-    updatedRecords.forEach(r -> {
-      records.put(r.getRecordKey(), r);
-    });
-    GenericRecord gRecord = (GenericRecord) 
records.get(key).getData().getInsertValue(schema).get();
-    // The record returned for this key should have the updated value for the 
field name
-    assertEquals(gRecord.get(fieldName).toString(), newValue);
-
-    // Get a record from the disk based map
-    key = recordKeys.get(recordKeys.size() - 1);
-    record = records.get(key);
-    // Get the field we want to update
-    fieldName = schema.getFields().stream().filter(field -> 
field.schema().getType() == Schema.Type.STRING).findAny()
-        .get().name();
-    // Use a new value to update this field
-    newValue = "update2";
-    recordsToUpdate = new ArrayList<>();
-    recordsToUpdate.add(record);
-
-    updatedRecords =
-        
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate, 
schema, fieldName, newValue);
-
-    // Upsert this updated record
-    updatedRecords.forEach(r -> {
-      records.put(r.getRecordKey(), r);
-    });
-    gRecord = (GenericRecord) 
records.get(key).getData().getInsertValue(schema).get();
-    // The record returned for this key should have the updated value for the 
field name
-    assertEquals(gRecord.get(fieldName).toString(), newValue);
   }
 
   @Test
@@ -360,29 +367,30 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
     final boolean isCompressionEnabled = false;
     final Schema schema = SchemaTestUtil.getSimpleSchema();
 
-    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records =
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records =
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
-            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled);
+            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled)) {
 
-    List<String> recordKeys = new ArrayList<>();
-    SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<String> recordKeys = new ArrayList<>();
+      SchemaTestUtil testUtil = new SchemaTestUtil();
 
-    // Put a single record. Payload size estimation happens as part of this 
initial put.
-    HoodieRecord seedRecord = 
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
-    records.put(seedRecord.getRecordKey(), seedRecord);
-
-    // Remove the key immediately to make the map empty again.
-    records.remove(seedRecord.getRecordKey());
-
-    // Verify payload size re-estimation does not throw exception
-    SchemaTestUtil testUtilx = new SchemaTestUtil();
-    List<HoodieRecord> hoodieRecords = 
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
-    hoodieRecords.stream().forEach(hoodieRecord -> {
-      assertDoesNotThrow(() -> {
-        records.put(hoodieRecord.getRecordKey(), hoodieRecord);
-      }, "ExternalSpillableMap put() should not throw exception!");
-      recordKeys.add(hoodieRecord.getRecordKey());
-    });
+      // Put a single record. Payload size estimation happens as part of this 
initial put.
+      HoodieRecord seedRecord = 
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
+      records.put(seedRecord.getRecordKey(), seedRecord);
+
+      // Remove the key immediately to make the map empty again.
+      records.remove(seedRecord.getRecordKey());
+
+      // Verify payload size re-estimation does not throw exception
+      SchemaTestUtil testUtilx = new SchemaTestUtil();
+      List<HoodieRecord> hoodieRecords = 
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
+      hoodieRecords.stream().forEach(hoodieRecord -> {
+        assertDoesNotThrow(() -> {
+          records.put(hoodieRecord.getRecordKey(), hoodieRecord);
+        }, "ExternalSpillableMap put() should not throw exception!");
+        recordKeys.add(hoodieRecord.getRecordKey());
+      });
+    }
   }
 
   @ParameterizedTest
@@ -408,28 +416,48 @@ public class TestExternalSpillableMap extends 
HoodieCommonTestHarness {
     // Estimate the first record size and calculate the total memory size that 
the in-memory map can only contain 100 records.
     long estimatedPayloadSize = keyEstimator.sizeEstimate(key) + 
valEstimator.sizeEstimate(record);
     long totalEstimatedSizeWith100Records = (long) ((estimatedPayloadSize * 
100) / 0.8);
-    ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> 
records =
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records =
         new ExternalSpillableMap<>(totalEstimatedSizeWith100Records, basePath, 
new DefaultSizeEstimator(),
-            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled);
-
-    // Insert 100 records and then in-memory map will contain 100 records.
-    SpillableMapTestUtils.upsertRecords(iRecords, records);
-
-    // Generate one record and it will be spilled to disk
-    List<IndexedRecord> singleRecord = testUtil.generateHoodieTestRecords(0, 
1);
-    List<String> singleRecordKey = 
SpillableMapTestUtils.upsertRecords(singleRecord, records);
-
-    // Get the field we want to update
-    String fieldName = schema.getFields().stream().filter(field -> 
field.schema().getType() == Schema.Type.STRING).findAny()
-        .get().name();
-    HoodieRecord hoodieRecord = records.get(singleRecordKey.get(0));
-    // Use a new value to update this field, the estimate size of this record 
will be less than the first record.
-    String newValue = "";
-    HoodieRecord updatedRecord =
-        
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(Arrays.asList(hoodieRecord),
 schema, fieldName, newValue).get(0);
-    records.put(updatedRecord.getRecordKey(), updatedRecord);
-
-    assertEquals(records.size(), 101);
+            new HoodieRecordSizeEstimator(schema), diskMapType, 
isCompressionEnabled)) {
+
+      // Insert 100 records and then in-memory map will contain 100 records.
+      SpillableMapTestUtils.upsertRecords(iRecords, records);
+
+      // Generate one record and it will be spilled to disk
+      List<IndexedRecord> singleRecord = testUtil.generateHoodieTestRecords(0, 
1);
+      List<String> singleRecordKey = 
SpillableMapTestUtils.upsertRecords(singleRecord, records);
+
+      // Get the field we want to update
+      String fieldName = schema.getFields().stream().filter(field -> 
field.schema().getType() == Schema.Type.STRING).findAny()
+          .get().name();
+      HoodieRecord hoodieRecord = records.get(singleRecordKey.get(0));
+      // Use a new value to update this field, the estimate size of this 
record will be less than the first record.
+      String newValue = "";
+      HoodieRecord updatedRecord =
+          
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(Arrays.asList(hoodieRecord),
 schema, fieldName, newValue).get(0);
+      records.put(updatedRecord.getRecordKey(), updatedRecord);
+
+      assertEquals(records.size(), 101);
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = ExternalSpillableMap.DiskMapType.class)
+  void assertEmptyMapOperations(ExternalSpillableMap.DiskMapType diskMapType) 
throws IOException {
+    // validate that operations on an empty map work as expected
+    try (ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records =
+             new ExternalSpillableMap<>(10, basePath, new 
DefaultSizeEstimator(),
+                 new DefaultSizeEstimator<>(), diskMapType, false)) {
+      assertTrue(records.isEmpty());
+      assertFalse(records.containsKey("key"));
+      assertFalse(records.containsValue("value"));
+      assertTrue(records.keySet().isEmpty());
+      assertTrue(records.values().isEmpty());
+      assertTrue(records.entrySet().isEmpty());
+      assertEquals(0, records.valueStream().count());
+      assertEquals(0, records.size());
+      assertFalse(records.iterator().hasNext());
+    }
   }
 
   private static Stream<Arguments> testArguments() {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
index a4dc9b0cc67..219d703e2e8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
@@ -217,5 +217,8 @@ public class HoodieMergeOnReadSnapshotReader extends 
AbstractRealtimeRecordReade
     if (logRecordScanner != null) {
       logRecordScanner.close();
     }
+    if (mergedRecordsByKey != null) {
+      mergedRecordsByKey.close();
+    }
   }
 }

Reply via email to