This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b3d0fa4ffe [HUDI-7238] Bug fixes and optimization of
ExternalSpillableMap (#10344)
2b3d0fa4ffe is described below
commit 2b3d0fa4ffe5067bbef5025c5a6fcc257e26fb0f
Author: Tim Brown <[email protected]>
AuthorDate: Fri Jan 26 21:07:40 2024 -0600
[HUDI-7238] Bug fixes and optimization of ExternalSpillableMap (#10344)
---
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 2 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 5 +-
.../view/SpillableMapBasedFileSystemView.java | 23 +
.../common/util/collection/BitCaskDiskMap.java | 5 +-
.../hudi/common/util/collection/DiskMap.java | 5 +-
.../util/collection/ExternalSpillableMap.java | 95 ++--
.../util/collection/TestExternalSpillableMap.java | 522 +++++++++++----------
.../realtime/HoodieMergeOnReadSnapshotReader.java | 3 +
8 files changed, 370 insertions(+), 290 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index 1e2fa7c59e4..9c78c5aae7c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -235,7 +235,7 @@ public class HoodieCDCLogger implements Closeable {
throw new HoodieIOException("Failed to close HoodieCDCLogger", e);
} finally {
// in case that crash when call `flushIfNeeded`, do the cleanup again.
- cdcData.clear();
+ cdcData.close();
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 79c80e1a5af..ae1f1dd1da4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
@@ -426,8 +427,8 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
markClosed();
writeIncomingRecords();
- if (keyToNewRecords instanceof ExternalSpillableMap) {
- ((ExternalSpillableMap) keyToNewRecords).close();
+ if (keyToNewRecords instanceof Closeable) {
+ ((Closeable) keyToNewRecords).close();
}
keyToNewRecords = null;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
index 75d29870a5a..95ba9dfd8f6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
@@ -216,4 +216,27 @@ public class SpillableMapBasedFileSystemView extends
HoodieTableFileSystemView {
fileIdsToRemove.forEach(fileGroupId ->
fgIdToReplaceInstants.remove(fileGroupId));
}
+
+ @Override
+ public void close() {
+ super.close();
+ if (partitionToFileGroupsMap != null) {
+ ((ExternalSpillableMap) partitionToFileGroupsMap).close();
+ }
+ if (fgIdToPendingClustering != null) {
+ ((ExternalSpillableMap) fgIdToPendingClustering).close();
+ }
+ if (fgIdToPendingCompaction != null) {
+ ((ExternalSpillableMap) fgIdToPendingCompaction).close();
+ }
+ if (fgIdToPendingLogCompaction != null) {
+ ((ExternalSpillableMap) fgIdToPendingLogCompaction).close();
+ }
+ if (fgIdToBootstrapBaseFile != null) {
+ ((ExternalSpillableMap) fgIdToBootstrapBaseFile).close();
+ }
+ if (fgIdToReplaceInstants != null) {
+ ((ExternalSpillableMap) fgIdToReplaceInstants).close();
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
index 56787b0c3d1..2f1595bfe1d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
@@ -218,8 +218,8 @@ public final class BitCaskDiskMap<T extends Serializable, R
extends Serializable
try {
byte[] val = isCompressionEnabled ?
DISK_COMPRESSION_REF.get().compressBytes(SerializationUtils.serialize(value)) :
SerializationUtils.serialize(value);
- Integer valueSize = val.length;
- Long timestamp = System.currentTimeMillis();
+ int valueSize = val.length;
+ long timestamp = System.currentTimeMillis();
this.valueMetadataMap.put(key,
new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize,
filePosition.get(), timestamp));
byte[] serializedKey = SerializationUtils.serialize(key);
@@ -271,6 +271,7 @@ public final class BitCaskDiskMap<T extends Serializable, R
extends Serializable
fileOutputStream.getChannel().force(false);
writeOnlyFileHandle.close();
}
+ fileOutputStream.close();
while (!openedAccessFiles.isEmpty()) {
BufferedRandomAccessFile file = openedAccessFiles.poll();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
index 31a34148592..c8d57aec032 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
@@ -63,7 +63,10 @@ public abstract class DiskMap<T extends Serializable, R
extends Serializable> im
* (typically 4 KB) to disk.
*/
private void addShutDownHook() {
- shutdownThread = new Thread(this::cleanup);
+ shutdownThread = new Thread(() -> {
+ LOG.warn("Failed to properly close DiskMap in application");
+ cleanup();
+ });
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 3d5fd1d5754..5df8f97d4b9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -54,7 +55,7 @@ import java.util.stream.Stream;
* frequently and incur unnecessary disk writes.
*/
@NotThreadSafe
-public class ExternalSpillableMap<T extends Serializable, R extends
Serializable> implements Map<T, R>, Serializable {
+public class ExternalSpillableMap<T extends Serializable, R extends
Serializable> implements Map<T, R>, Serializable, Closeable {
// Find the actual estimated payload size after inserting N records
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
@@ -67,7 +68,7 @@ public class ExternalSpillableMap<T extends Serializable, R
extends Serializable
private transient volatile DiskMap<T, R> diskBasedMap;
// TODO(na) : a dynamic sizing factor to ensure we have space for other
objects in memory and
// incorrect payload estimation
- private final Double sizingFactorForInMemoryMap = 0.8;
+ private static final double SIZING_FACTOR_FOR_IN_MEMORY_MAP = 0.8;
// Size Estimator for key type
private final SizeEstimator<T> keySizeEstimator;
// Size Estimator for key types
@@ -77,27 +78,27 @@ public class ExternalSpillableMap<T extends Serializable, R
extends Serializable
// Enables compression of values stored in disc
private final boolean isCompressionEnabled;
// current space occupied by this map in-memory
- private Long currentInMemoryMapSize;
+ private long currentInMemoryMapSize;
// An estimate of the size of each payload written to this map
private volatile long estimatedPayloadSize = 0;
// Base File Path
private final String baseFilePath;
- public ExternalSpillableMap(Long maxInMemorySizeInBytes, String
baseFilePath, SizeEstimator<T> keySizeEstimator,
+ public ExternalSpillableMap(long maxInMemorySizeInBytes, String
baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator) throws
IOException {
this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator,
valueSizeEstimator, DiskMapType.BITCASK);
}
- public ExternalSpillableMap(Long maxInMemorySizeInBytes, String
baseFilePath, SizeEstimator<T> keySizeEstimator,
+ public ExternalSpillableMap(long maxInMemorySizeInBytes, String
baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator, DiskMapType
diskMapType) throws IOException {
this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator,
valueSizeEstimator, diskMapType, false);
}
- public ExternalSpillableMap(Long maxInMemorySizeInBytes, String
baseFilePath, SizeEstimator<T> keySizeEstimator,
+ public ExternalSpillableMap(long maxInMemorySizeInBytes, String
baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator, DiskMapType
diskMapType, boolean isCompressionEnabled) throws IOException {
this.inMemoryMap = new HashMap<>();
this.baseFilePath = baseFilePath;
- this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes *
sizingFactorForInMemoryMap);
+ this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes *
SIZING_FACTOR_FOR_IN_MEMORY_MAP);
this.currentInMemoryMapSize = 0L;
this.keySizeEstimator = keySizeEstimator;
this.valueSizeEstimator = valueSizeEstimator;
@@ -105,7 +106,7 @@ public class ExternalSpillableMap<T extends Serializable, R
extends Serializable
this.isCompressionEnabled = isCompressionEnabled;
}
- private DiskMap<T, R> getDiskBasedMap() {
+ private void initDiskBasedMap() {
if (null == diskBasedMap) {
synchronized (this) {
if (null == diskBasedMap) {
@@ -116,7 +117,7 @@ public class ExternalSpillableMap<T extends Serializable, R
extends Serializable
break;
case BITCASK:
default:
- diskBasedMap = new BitCaskDiskMap<>(baseFilePath,
isCompressionEnabled);
+ diskBasedMap = new BitCaskDiskMap<>(baseFilePath,
isCompressionEnabled);
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
@@ -124,28 +125,28 @@ public class ExternalSpillableMap<T extends Serializable,
R extends Serializable
}
}
}
- return diskBasedMap;
}
/**
* A custom iterator to wrap over iterating in-memory + disk spilled data.
*/
public Iterator<R> iterator() {
- return new IteratorWrapper<>(inMemoryMap.values().iterator(),
getDiskBasedMap().iterator());
+
+ return diskBasedMap == null ? inMemoryMap.values().iterator() : new
IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator());
}
/**
* Number of entries in BitCaskDiskMap.
*/
public int getDiskBasedMapNumEntries() {
- return getDiskBasedMap().size();
+ return diskBasedMap == null ? 0 : diskBasedMap.size();
}
/**
* Number of bytes spilled to disk.
*/
public long getSizeOfFileOnDiskInBytes() {
- return getDiskBasedMap().sizeOfFileOnDiskInBytes();
+ return diskBasedMap == null ? 0 : diskBasedMap.sizeOfFileOnDiskInBytes();
}
/**
@@ -164,38 +165,38 @@ public class ExternalSpillableMap<T extends Serializable,
R extends Serializable
@Override
public int size() {
- return inMemoryMap.size() + getDiskBasedMap().size();
+ return inMemoryMap.size() + getDiskBasedMapNumEntries();
}
@Override
public boolean isEmpty() {
- return inMemoryMap.isEmpty() && getDiskBasedMap().isEmpty();
+ return inMemoryMap.isEmpty() && getDiskBasedMapNumEntries() == 0;
}
@Override
public boolean containsKey(Object key) {
- return inMemoryMap.containsKey(key) || getDiskBasedMap().containsKey(key);
+ return inMemoryMap.containsKey(key) || inDiskContainsKey(key);
}
@Override
public boolean containsValue(Object value) {
- return inMemoryMap.containsValue(value) ||
getDiskBasedMap().containsValue(value);
+ return inMemoryMap.containsValue(value) || (diskBasedMap != null &&
diskBasedMap.containsValue(value));
}
- public boolean inMemoryContainsKey(Object key) {
+ private boolean inMemoryContainsKey(Object key) {
return inMemoryMap.containsKey(key);
}
- public boolean inDiskContainsKey(Object key) {
- return getDiskBasedMap().containsKey(key);
+ private boolean inDiskContainsKey(Object key) {
+ return diskBasedMap != null && diskBasedMap.containsKey(key);
}
@Override
public R get(Object key) {
if (inMemoryMap.containsKey(key)) {
return inMemoryMap.get(key);
- } else if (getDiskBasedMap().containsKey(key)) {
- return getDiskBasedMap().get(key);
+ } else if (inDiskContainsKey(key)) {
+ return diskBasedMap.get(key);
}
return null;
}
@@ -217,11 +218,14 @@ public class ExternalSpillableMap<T extends Serializable,
R extends Serializable
this.currentInMemoryMapSize += this.estimatedPayloadSize;
// Remove the old version of the record from disk first to avoid data
duplication.
if (inDiskContainsKey(key)) {
- getDiskBasedMap().remove(key);
+ diskBasedMap.remove(key);
}
this.inMemoryMap.put(key, value);
} else {
- getDiskBasedMap().put(key, value);
+ if (diskBasedMap == null) {
+ initDiskBasedMap();
+ }
+ diskBasedMap.put(key, value);
}
return value;
}
@@ -232,8 +236,8 @@ public class ExternalSpillableMap<T extends Serializable, R
extends Serializable
if (inMemoryMap.containsKey(key)) {
currentInMemoryMapSize -= estimatedPayloadSize;
return inMemoryMap.remove(key);
- } else if (getDiskBasedMap().containsKey(key)) {
- return getDiskBasedMap().remove(key);
+ } else if (inDiskContainsKey(key)) {
+ return diskBasedMap.remove(key);
}
return null;
}
@@ -248,43 +252,60 @@ public class ExternalSpillableMap<T extends Serializable,
R extends Serializable
@Override
public void clear() {
inMemoryMap.clear();
- getDiskBasedMap().clear();
+ if (diskBasedMap != null) {
+ diskBasedMap.clear();
+ }
currentInMemoryMapSize = 0L;
}
public void close() {
inMemoryMap.clear();
- getDiskBasedMap().close();
+ if (diskBasedMap != null) {
+ diskBasedMap.close();
+ }
currentInMemoryMapSize = 0L;
}
@Override
public Set<T> keySet() {
- Set<T> keySet = new HashSet<T>();
+ if (diskBasedMap == null) {
+ return inMemoryMap.keySet();
+ }
+ Set<T> keySet = new HashSet<>(inMemoryMap.size() + diskBasedMap.size());
keySet.addAll(inMemoryMap.keySet());
- keySet.addAll(getDiskBasedMap().keySet());
+ keySet.addAll(diskBasedMap.keySet());
return keySet;
}
@Override
public Collection<R> values() {
- if (getDiskBasedMap().isEmpty()) {
+ if (diskBasedMap == null) {
return inMemoryMap.values();
}
- List<R> result = new ArrayList<>(inMemoryMap.values());
- result.addAll(getDiskBasedMap().values());
+ List<R> result = new ArrayList<>(inMemoryMap.size() + diskBasedMap.size());
+ result.addAll(inMemoryMap.values());
+ result.addAll(diskBasedMap.values());
return result;
}
public Stream<R> valueStream() {
- return Stream.concat(inMemoryMap.values().stream(),
getDiskBasedMap().valueStream());
+ if (diskBasedMap == null) {
+ return inMemoryMap.values().stream();
+ }
+ return Stream.concat(inMemoryMap.values().stream(),
diskBasedMap.valueStream());
}
@Override
public Set<Entry<T, R>> entrySet() {
- Set<Entry<T, R>> entrySet = new HashSet<>();
- entrySet.addAll(inMemoryMap.entrySet());
- entrySet.addAll(getDiskBasedMap().entrySet());
+ if (diskBasedMap == null) {
+ return inMemoryMap.entrySet();
+ }
+ Set<Entry<T, R>> inMemory = inMemoryMap.entrySet();
+ Set<Entry<T, R>> onDisk = diskBasedMap.entrySet();
+
+ Set<Entry<T, R>> entrySet = new HashSet<>(inMemory.size() + onDisk.size());
+ entrySet.addAll(inMemory);
+ entrySet.addAll(onDisk);
return entrySet;
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
index 2b651ecfd9c..ed64ee00287 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
@@ -43,6 +43,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
@@ -81,33 +82,34 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws IOException, URISyntaxException {
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
- ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records =
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled); // 16B
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled)) { // 16B
- SchemaTestUtil testUtil = new SchemaTestUtil();
- List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
- List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords,
records);
- assert (recordKeys.size() == 100);
-
- // Test iterator
- Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr =
records.iterator();
- int cntSize = 0;
- while (itr.hasNext()) {
- HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
- cntSize++;
- assert recordKeys.contains(rec.getRecordKey());
- }
- assertEquals(recordKeys.size(), cntSize);
-
- // Test value stream
- List<HoodieRecord<? extends HoodieRecordPayload>> values =
records.valueStream().collect(Collectors.toList());
- cntSize = 0;
- for (HoodieRecord value : values) {
- assert recordKeys.contains(value.getRecordKey());
- cntSize++;
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0,
100);
+ List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords,
records);
+ assert (recordKeys.size() == 100);
+
+ // Test iterator
+ Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr =
records.iterator();
+ int cntSize = 0;
+ while (itr.hasNext()) {
+ HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+ cntSize++;
+ assert recordKeys.contains(rec.getRecordKey());
+ }
+ assertEquals(recordKeys.size(), cntSize);
+
+ // Test value stream
+ List<HoodieRecord<? extends HoodieRecordPayload>> values =
records.valueStream().collect(Collectors.toList());
+ cntSize = 0;
+ for (HoodieRecord value : values) {
+ assert recordKeys.contains(value.getRecordKey());
+ cntSize++;
+ }
+ assertEquals(recordKeys.size(), cntSize);
}
- assertEquals(recordKeys.size(), cntSize);
}
@ParameterizedTest
@@ -115,38 +117,39 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
public void testSimpleUpsert(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws IOException, URISyntaxException {
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
- ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records =
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled); // 16B
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled)) { // 16B
- SchemaTestUtil testUtil = new SchemaTestUtil();
- List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
- List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords,
records);
- assert (recordKeys.size() == 100);
- Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr =
records.iterator();
- while (itr.hasNext()) {
- HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
- assert recordKeys.contains(rec.getRecordKey());
- }
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0,
100);
+ List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords,
records);
+ assert (recordKeys.size() == 100);
+ Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr =
records.iterator();
+ while (itr.hasNext()) {
+ HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+ assert recordKeys.contains(rec.getRecordKey());
+ }
- List<IndexedRecord> updatedRecords =
SchemaTestUtil.updateHoodieTestRecords(recordKeys,
- testUtil.generateHoodieTestRecords(0, 100),
InProcessTimeGenerator.createNewInstantTime());
+ List<IndexedRecord> updatedRecords =
SchemaTestUtil.updateHoodieTestRecords(recordKeys,
+ testUtil.generateHoodieTestRecords(0, 100),
InProcessTimeGenerator.createNewInstantTime());
- // update records already inserted
- SpillableMapTestUtils.upsertRecords(updatedRecords, records);
+ // update records already inserted
+ SpillableMapTestUtils.upsertRecords(updatedRecords, records);
- // make sure we have records spilled to disk
- assertTrue(records.getDiskBasedMapNumEntries() > 0);
+ // make sure we have records spilled to disk
+ assertTrue(records.getDiskBasedMapNumEntries() > 0);
- // iterate over the updated records and compare the value from Map
- updatedRecords.forEach(record -> {
- HoodieRecord rec = records.get(((GenericRecord)
record).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
- try {
- assertEquals(((HoodieAvroRecord)
rec).getData().getInsertValue(schema).get(), record);
- } catch (IOException io) {
- throw new UncheckedIOException(io);
- }
- });
+ // iterate over the updated records and compare the value from Map
+ updatedRecords.forEach(record -> {
+ HoodieRecord rec = records.get(((GenericRecord)
record).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ try {
+ assertEquals(((HoodieAvroRecord)
rec).getData().getInsertValue(schema).get(), record);
+ } catch (IOException io) {
+ throw new UncheckedIOException(io);
+ }
+ });
+ }
}
@ParameterizedTest
@@ -156,56 +159,57 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
String payloadClazz = HoodieAvroPayload.class.getName();
- ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records =
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled); // 16B
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled)) { // 16B
- SchemaTestUtil testUtil = new SchemaTestUtil();
- List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
- // insert a bunch of records so that values spill to disk too
- List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords,
records);
- IndexedRecord inMemoryRecord = iRecords.get(0);
- String ikey = ((GenericRecord)
inMemoryRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String iPartitionPath = ((GenericRecord)
inMemoryRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
- HoodieRecord inMemoryHoodieRecord = new HoodieAvroRecord<>(new
HoodieKey(ikey, iPartitionPath),
- new HoodieAvroPayload(Option.of((GenericRecord) inMemoryRecord)));
-
- IndexedRecord onDiskRecord = iRecords.get(99);
- String dkey = ((GenericRecord)
onDiskRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String dPartitionPath = ((GenericRecord)
onDiskRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
- HoodieRecord onDiskHoodieRecord = new HoodieAvroRecord<>(new
HoodieKey(dkey, dPartitionPath),
- new HoodieAvroPayload(Option.of((GenericRecord) onDiskRecord)));
- // assert size
- assert records.size() == 100;
- // get should return the same HoodieKey, same location and same value
- assert inMemoryHoodieRecord.getKey().equals(records.get(ikey).getKey());
- assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey());
- // compare the member variables of HoodieRecord not set by the constructor
- assert
records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID);
- assert
records.get(ikey).getCurrentLocation().getInstantTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
-
- // test contains
- assertTrue(records.containsKey(ikey));
- assertTrue(records.containsKey(dkey));
-
- // test isEmpty
- assertFalse(records.isEmpty());
-
- // test containsAll
- assertTrue(records.keySet().containsAll(recordKeys));
-
- // remove (from inMemory and onDisk)
- HoodieRecord removedRecord = records.remove(ikey);
- assertTrue(removedRecord != null);
- assertFalse(records.containsKey(ikey));
-
- removedRecord = records.remove(dkey);
- assertTrue(removedRecord != null);
- assertFalse(records.containsKey(dkey));
-
- // test clear
- records.clear();
- assertTrue(records.size() == 0);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0,
100);
+ // insert a bunch of records so that values spill to disk too
+ List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords,
records);
+ IndexedRecord inMemoryRecord = iRecords.get(0);
+ String ikey = ((GenericRecord)
inMemoryRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String iPartitionPath = ((GenericRecord)
inMemoryRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ HoodieRecord inMemoryHoodieRecord = new HoodieAvroRecord<>(new
HoodieKey(ikey, iPartitionPath),
+ new HoodieAvroPayload(Option.of((GenericRecord) inMemoryRecord)));
+
+ IndexedRecord onDiskRecord = iRecords.get(99);
+ String dkey = ((GenericRecord)
onDiskRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String dPartitionPath = ((GenericRecord)
onDiskRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ HoodieRecord onDiskHoodieRecord = new HoodieAvroRecord<>(new
HoodieKey(dkey, dPartitionPath),
+ new HoodieAvroPayload(Option.of((GenericRecord) onDiskRecord)));
+ // assert size
+ assert records.size() == 100;
+ // get should return the same HoodieKey, same location and same value
+ assert inMemoryHoodieRecord.getKey().equals(records.get(ikey).getKey());
+ assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey());
+ // compare the member variables of HoodieRecord not set by the
constructor
+ assert
records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID);
+ assert
records.get(ikey).getCurrentLocation().getInstantTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
+
+ // test contains
+ assertTrue(records.containsKey(ikey));
+ assertTrue(records.containsKey(dkey));
+
+ // test isEmpty
+ assertFalse(records.isEmpty());
+
+ // test containsAll
+ assertTrue(records.keySet().containsAll(recordKeys));
+
+ // remove (from inMemory and onDisk)
+ HoodieRecord removedRecord = records.remove(ikey);
+ assertTrue(removedRecord != null);
+ assertFalse(records.containsKey(ikey));
+
+ removedRecord = records.remove(dkey);
+ assertTrue(removedRecord != null);
+ assertFalse(records.containsKey(dkey));
+
+ // test clear
+ records.clear();
+ assertTrue(records.size() == 0);
+ }
}
@ParameterizedTest
@@ -213,20 +217,21 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
public void simpleTestWithException(ExternalSpillableMap.DiskMapType
diskMapType, boolean isCompressionEnabled) throws IOException,
URISyntaxException {
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
- ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records = new ExternalSpillableMap<>(16L,
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records = new ExternalSpillableMap<>(16L,
failureOutputPath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled); // 16B
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled)) { // 16B
- SchemaTestUtil testUtil = new SchemaTestUtil();
- List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
- List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords,
records);
- assert (recordKeys.size() == 100);
- Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr =
records.iterator();
- assertThrows(IOException.class, () -> {
- while (itr.hasNext()) {
- throw new IOException("Testing failures...");
- }
- });
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0,
100);
+ List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords,
records);
+ assert (recordKeys.size() == 100);
+ Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr =
records.iterator();
+ assertThrows(IOException.class, () -> {
+ while (itr.hasNext()) {
+ throw new IOException("Testing failures...");
+ }
+ });
+ }
}
@ParameterizedTest
@@ -237,52 +242,53 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
- ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records =
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled); // 16B
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled)) { // 16B
+
+ List<String> recordKeys = new ArrayList<>();
+ // Ensure we spill to disk
+ while (records.getDiskBasedMapNumEntries() < 1) {
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0,
100);
+ recordKeys.addAll(SpillableMapTestUtils.upsertRecords(iRecords,
records));
+ }
- List<String> recordKeys = new ArrayList<>();
- // Ensure we spill to disk
- while (records.getDiskBasedMapNumEntries() < 1) {
- SchemaTestUtil testUtil = new SchemaTestUtil();
- List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0,
100);
- recordKeys.addAll(SpillableMapTestUtils.upsertRecords(iRecords,
records));
+ // Get a record from the in-Memory map
+ String key = recordKeys.get(0);
+ HoodieAvroRecord record = (HoodieAvroRecord) records.get(key);
+ List<IndexedRecord> recordsToUpdate = new ArrayList<>();
+ recordsToUpdate.add((IndexedRecord)
record.getData().getInsertValue(schema).get());
+
+ String newCommitTime = InProcessTimeGenerator.createNewInstantTime();
+ List<String> keysToBeUpdated = new ArrayList<>();
+ keysToBeUpdated.add(key);
+ // Update the instantTime for this record
+ List<IndexedRecord> updatedRecords =
+ SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated,
recordsToUpdate, newCommitTime);
+ // Upsert this updated record
+ SpillableMapTestUtils.upsertRecords(updatedRecords, records);
+ GenericRecord gRecord = (GenericRecord)
records.get(key).getData().getInsertValue(schema).get();
+ // The record returned for this key should have the updated commitTime
+ assert
newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
+
+ // Get a record from the disk based map
+ key = recordKeys.get(recordKeys.size() - 1);
+ record = (HoodieAvroRecord) records.get(key);
+ recordsToUpdate = new ArrayList<>();
+ recordsToUpdate.add((IndexedRecord)
record.getData().getInsertValue(schema).get());
+
+ newCommitTime = InProcessTimeGenerator.createNewInstantTime();
+ keysToBeUpdated = new ArrayList<>();
+ keysToBeUpdated.add(key);
+ // Update the commitTime for this record
+ updatedRecords = SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated,
recordsToUpdate, newCommitTime);
+ // Upsert this updated record
+ SpillableMapTestUtils.upsertRecords(updatedRecords, records);
+ gRecord = (GenericRecord)
records.get(key).getData().getInsertValue(schema).get();
+ // The record returned for this key should have the updated instantTime
+ assert
newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
}
-
- // Get a record from the in-Memory map
- String key = recordKeys.get(0);
- HoodieAvroRecord record = (HoodieAvroRecord) records.get(key);
- List<IndexedRecord> recordsToUpdate = new ArrayList<>();
- recordsToUpdate.add((IndexedRecord)
record.getData().getInsertValue(schema).get());
-
- String newCommitTime = InProcessTimeGenerator.createNewInstantTime();
- List<String> keysToBeUpdated = new ArrayList<>();
- keysToBeUpdated.add(key);
- // Update the instantTime for this record
- List<IndexedRecord> updatedRecords =
- SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated,
recordsToUpdate, newCommitTime);
- // Upsert this updated record
- SpillableMapTestUtils.upsertRecords(updatedRecords, records);
- GenericRecord gRecord = (GenericRecord)
records.get(key).getData().getInsertValue(schema).get();
- // The record returned for this key should have the updated commitTime
- assert
newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
-
- // Get a record from the disk based map
- key = recordKeys.get(recordKeys.size() - 1);
- record = (HoodieAvroRecord) records.get(key);
- recordsToUpdate = new ArrayList<>();
- recordsToUpdate.add((IndexedRecord)
record.getData().getInsertValue(schema).get());
-
- newCommitTime = InProcessTimeGenerator.createNewInstantTime();
- keysToBeUpdated = new ArrayList<>();
- keysToBeUpdated.add(key);
- // Update the commitTime for this record
- updatedRecords = SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated,
recordsToUpdate, newCommitTime);
- // Upsert this updated record
- SpillableMapTestUtils.upsertRecords(updatedRecords, records);
- gRecord = (GenericRecord)
records.get(key).getData().getInsertValue(schema).get();
- // The record returned for this key should have the updated instantTime
- assert
newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
}
@ParameterizedTest
@@ -293,65 +299,66 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
Schema schema = SchemaTestUtil.getSimpleSchema();
- ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records =
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled); // 16B
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled)) { // 16B
- SchemaTestUtil testUtil = new SchemaTestUtil();
- List<String> recordKeys = new ArrayList<>();
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<String> recordKeys = new ArrayList<>();
+
+ // Ensure we spill to disk
+ while (records.getDiskBasedMapNumEntries() < 1) {
+ List<HoodieRecord> hoodieRecords =
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
+ hoodieRecords.stream().forEach(r -> {
+ records.put(r.getRecordKey(), r);
+ recordKeys.add(r.getRecordKey());
+ });
+ }
- // Ensure we spill to disk
- while (records.getDiskBasedMapNumEntries() < 1) {
- List<HoodieRecord> hoodieRecords =
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
- hoodieRecords.stream().forEach(r -> {
+ // Get a record from the in-Memory map
+ String key = recordKeys.get(0);
+ HoodieRecord record = records.get(key);
+ // Get the field we want to update
+ String fieldName = schema.getFields().stream().filter(field ->
field.schema().getType() == Schema.Type.STRING)
+ .findAny().get().name();
+ // Use a new value to update this field
+ String newValue = "update1";
+ List<HoodieRecord> recordsToUpdate = new ArrayList<>();
+ recordsToUpdate.add(record);
+
+ List<HoodieRecord> updatedRecords =
+
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate,
schema, fieldName, newValue);
+
+ // Upsert this updated record
+ updatedRecords.forEach(r -> {
records.put(r.getRecordKey(), r);
- recordKeys.add(r.getRecordKey());
});
+ GenericRecord gRecord = (GenericRecord)
records.get(key).getData().getInsertValue(schema).get();
+ // The record returned for this key should have the updated value for
the field name
+ assertEquals(gRecord.get(fieldName).toString(), newValue);
+
+ // Get a record from the disk based map
+ key = recordKeys.get(recordKeys.size() - 1);
+ record = records.get(key);
+ // Get the field we want to update
+ fieldName = schema.getFields().stream().filter(field ->
field.schema().getType() == Schema.Type.STRING).findAny()
+ .get().name();
+ // Use a new value to update this field
+ newValue = "update2";
+ recordsToUpdate = new ArrayList<>();
+ recordsToUpdate.add(record);
+
+ updatedRecords =
+
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate,
schema, fieldName, newValue);
+
+ // Upsert this updated record
+ updatedRecords.forEach(r -> {
+ records.put(r.getRecordKey(), r);
+ });
+ gRecord = (GenericRecord)
records.get(key).getData().getInsertValue(schema).get();
+ // The record returned for this key should have the updated value for
the field name
+ assertEquals(gRecord.get(fieldName).toString(), newValue);
}
-
- // Get a record from the in-Memory map
- String key = recordKeys.get(0);
- HoodieRecord record = records.get(key);
- // Get the field we want to update
- String fieldName = schema.getFields().stream().filter(field ->
field.schema().getType() == Schema.Type.STRING)
- .findAny().get().name();
- // Use a new value to update this field
- String newValue = "update1";
- List<HoodieRecord> recordsToUpdate = new ArrayList<>();
- recordsToUpdate.add(record);
-
- List<HoodieRecord> updatedRecords =
-
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate,
schema, fieldName, newValue);
-
- // Upsert this updated record
- updatedRecords.forEach(r -> {
- records.put(r.getRecordKey(), r);
- });
- GenericRecord gRecord = (GenericRecord)
records.get(key).getData().getInsertValue(schema).get();
- // The record returned for this key should have the updated value for the
field name
- assertEquals(gRecord.get(fieldName).toString(), newValue);
-
- // Get a record from the disk based map
- key = recordKeys.get(recordKeys.size() - 1);
- record = records.get(key);
- // Get the field we want to update
- fieldName = schema.getFields().stream().filter(field ->
field.schema().getType() == Schema.Type.STRING).findAny()
- .get().name();
- // Use a new value to update this field
- newValue = "update2";
- recordsToUpdate = new ArrayList<>();
- recordsToUpdate.add(record);
-
- updatedRecords =
-
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate,
schema, fieldName, newValue);
-
- // Upsert this updated record
- updatedRecords.forEach(r -> {
- records.put(r.getRecordKey(), r);
- });
- gRecord = (GenericRecord)
records.get(key).getData().getInsertValue(schema).get();
- // The record returned for this key should have the updated value for the
field name
- assertEquals(gRecord.get(fieldName).toString(), newValue);
}
@Test
@@ -360,29 +367,30 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
final boolean isCompressionEnabled = false;
final Schema schema = SchemaTestUtil.getSimpleSchema();
- ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records =
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled);
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled)) {
- List<String> recordKeys = new ArrayList<>();
- SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<String> recordKeys = new ArrayList<>();
+ SchemaTestUtil testUtil = new SchemaTestUtil();
- // Put a single record. Payload size estimation happens as part of this
initial put.
- HoodieRecord seedRecord =
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
- records.put(seedRecord.getRecordKey(), seedRecord);
-
- // Remove the key immediately to make the map empty again.
- records.remove(seedRecord.getRecordKey());
-
- // Verify payload size re-estimation does not throw exception
- SchemaTestUtil testUtilx = new SchemaTestUtil();
- List<HoodieRecord> hoodieRecords =
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
- hoodieRecords.stream().forEach(hoodieRecord -> {
- assertDoesNotThrow(() -> {
- records.put(hoodieRecord.getRecordKey(), hoodieRecord);
- }, "ExternalSpillableMap put() should not throw exception!");
- recordKeys.add(hoodieRecord.getRecordKey());
- });
+ // Put a single record. Payload size estimation happens as part of this
initial put.
+ HoodieRecord seedRecord =
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
+ records.put(seedRecord.getRecordKey(), seedRecord);
+
+ // Remove the key immediately to make the map empty again.
+ records.remove(seedRecord.getRecordKey());
+
+ // Verify payload size re-estimation does not throw exception
+ SchemaTestUtil testUtilx = new SchemaTestUtil();
+ List<HoodieRecord> hoodieRecords =
testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
+ hoodieRecords.stream().forEach(hoodieRecord -> {
+ assertDoesNotThrow(() -> {
+ records.put(hoodieRecord.getRecordKey(), hoodieRecord);
+ }, "ExternalSpillableMap put() should not throw exception!");
+ recordKeys.add(hoodieRecord.getRecordKey());
+ });
+ }
}
@ParameterizedTest
@@ -408,28 +416,48 @@ public class TestExternalSpillableMap extends
HoodieCommonTestHarness {
// Estimate the first record size and calculate the total memory size that
the in-memory map can only contain 100 records.
long estimatedPayloadSize = keyEstimator.sizeEstimate(key) +
valEstimator.sizeEstimate(record);
long totalEstimatedSizeWith100Records = (long) ((estimatedPayloadSize *
100) / 0.8);
- ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>>
records =
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records =
new ExternalSpillableMap<>(totalEstimatedSizeWith100Records, basePath,
new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled);
-
- // Insert 100 records and then in-memory map will contain 100 records.
- SpillableMapTestUtils.upsertRecords(iRecords, records);
-
- // Generate one record and it will be spilled to disk
- List<IndexedRecord> singleRecord = testUtil.generateHoodieTestRecords(0,
1);
- List<String> singleRecordKey =
SpillableMapTestUtils.upsertRecords(singleRecord, records);
-
- // Get the field we want to update
- String fieldName = schema.getFields().stream().filter(field ->
field.schema().getType() == Schema.Type.STRING).findAny()
- .get().name();
- HoodieRecord hoodieRecord = records.get(singleRecordKey.get(0));
- // Use a new value to update this field, the estimate size of this record
will be less than the first record.
- String newValue = "";
- HoodieRecord updatedRecord =
-
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(Arrays.asList(hoodieRecord),
schema, fieldName, newValue).get(0);
- records.put(updatedRecord.getRecordKey(), updatedRecord);
-
- assertEquals(records.size(), 101);
+ new HoodieRecordSizeEstimator(schema), diskMapType,
isCompressionEnabled)) {
+
+ // Insert 100 records and then in-memory map will contain 100 records.
+ SpillableMapTestUtils.upsertRecords(iRecords, records);
+
+ // Generate one record and it will be spilled to disk
+ List<IndexedRecord> singleRecord = testUtil.generateHoodieTestRecords(0,
1);
+ List<String> singleRecordKey =
SpillableMapTestUtils.upsertRecords(singleRecord, records);
+
+ // Get the field we want to update
+ String fieldName = schema.getFields().stream().filter(field ->
field.schema().getType() == Schema.Type.STRING).findAny()
+ .get().name();
+ HoodieRecord hoodieRecord = records.get(singleRecordKey.get(0));
+ // Use a new value to update this field, the estimate size of this
record will be less than the first record.
+ String newValue = "";
+ HoodieRecord updatedRecord =
+
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(Arrays.asList(hoodieRecord),
schema, fieldName, newValue).get(0);
+ records.put(updatedRecord.getRecordKey(), updatedRecord);
+
+ assertEquals(records.size(), 101);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = ExternalSpillableMap.DiskMapType.class)
+ void assertEmptyMapOperations(ExternalSpillableMap.DiskMapType diskMapType)
throws IOException {
+ // validate that operations on an empty map work as expected
+ try (ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records =
+ new ExternalSpillableMap<>(10, basePath, new
DefaultSizeEstimator(),
+ new DefaultSizeEstimator<>(), diskMapType, false)) {
+ assertTrue(records.isEmpty());
+ assertFalse(records.containsKey("key"));
+ assertFalse(records.containsValue("value"));
+ assertTrue(records.keySet().isEmpty());
+ assertTrue(records.values().isEmpty());
+ assertTrue(records.entrySet().isEmpty());
+ assertEquals(0, records.valueStream().count());
+ assertEquals(0, records.size());
+ assertFalse(records.iterator().hasNext());
+ }
}
private static Stream<Arguments> testArguments() {
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
index a4dc9b0cc67..219d703e2e8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
@@ -217,5 +217,8 @@ public class HoodieMergeOnReadSnapshotReader extends
AbstractRealtimeRecordReade
if (logRecordScanner != null) {
logRecordScanner.close();
}
+ if (mergedRecordsByKey != null) {
+ mergedRecordsByKey.close();
+ }
}
}