This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4ed41e0f15e [HUDI-7340] Use spillable map for cached log records in
HoodieBaseFileGroupRecordBuffer (#10588)
4ed41e0f15e is described below
commit 4ed41e0f15e65431799340bb655d28db92de34b9
Author: Danny Chan <[email protected]>
AuthorDate: Thu Feb 1 08:43:21 2024 +0800
[HUDI-7340] Use spillable map for cached log records in
HoodieBaseFileGroupRecordBuffer (#10588)
---
.../table/log/HoodieMergedLogRecordReader.java | 3 ++-
.../read/HoodieBaseFileGroupRecordBuffer.java | 27 ++++++++++++++++------
.../common/table/read/HoodieFileGroupReader.java | 11 ++++++---
.../table/read/HoodieFileGroupRecordBuffer.java | 7 +++---
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 16 +++++++++----
.../HoodiePositionBasedFileGroupRecordBuffer.java | 14 +++++++----
.../common/util/HoodieRecordSizeEstimator.java | 5 ++--
.../table/read/TestHoodieFileGroupReaderBase.java | 5 ++++
.../reader/HoodieFileGroupReaderTestUtils.java | 8 ++++++-
...odieFileGroupReaderBasedParquetFileFormat.scala | 17 ++++++++++----
...stHoodiePositionBasedFileGroupRecordBuffer.java | 7 +++++-
11 files changed, 88 insertions(+), 32 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 44c4c973eae..6b31c200907 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -183,7 +184,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return recordBuffer.getLogRecordIterator();
}
- public Map<Object, Pair<Option<T>, Map<String, Object>>> getRecords() {
+ public Map<Serializable, Pair<Option<T>, Map<String, Object>>> getRecords() {
return recordBuffer.getLogRecords();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 2f695cf0249..70ddb5abff2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -27,11 +27,15 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieValidationException;
@@ -39,8 +43,8 @@ import org.apache.avro.Schema;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -56,7 +60,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected final Option<String[]> partitionPathFieldOpt;
protected final HoodieRecordMerger recordMerger;
protected final TypedProperties payloadProps;
- protected final Map<Object, Pair<Option<T>, Map<String, Object>>> records;
+ protected final ExternalSpillableMap<Serializable, Pair<Option<T>,
Map<String, Object>>> records;
protected ClosableIterator<T> baseFileIterator;
protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
protected T nextRecord;
@@ -68,7 +72,11 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger recordMerger,
- TypedProperties payloadProps) {
+ TypedProperties payloadProps,
+ long maxMemorySizeInBytes,
+ String spillableMapBasePath,
+ ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isBitCaskDiskMapCompressionEnabled) {
this.readerContext = readerContext;
this.readerSchema = readerSchema;
this.baseFileSchema = baseFileSchema;
@@ -76,7 +84,13 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
this.partitionPathFieldOpt = partitionPathFieldOpt;
this.recordMerger = recordMerger;
this.payloadProps = payloadProps;
- this.records = new HashMap<>();
+ try {
+ // Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
+ this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator<>(),
+ new HoodieRecordSizeEstimator<>(readerSchema), diskMapType,
isBitCaskDiskMapCompressionEnabled);
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException when creating
ExternalSpillableMap at " + spillableMapBasePath, e);
+ }
}
@Override
@@ -104,7 +118,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
}
@Override
- public Map<Object, Pair<Option<T>, Map<String, Object>>> getLogRecords() {
+ public Map<Serializable, Pair<Option<T>, Map<String, Object>>>
getLogRecords() {
return records;
}
@@ -220,8 +234,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
* @return
* @throws IOException
*/
- protected Pair<ClosableIterator<T>, Schema> getRecordsIterator(
- HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws
IOException {
+ protected Pair<ClosableIterator<T>, Schema>
getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) {
ClosableIterator<T> blockRecordsIterator;
if (keySpecOpt.isPresent()) {
KeySpec keySpec = keySpecOpt.get();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 8edf5d7130e..3984177bdbf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
@@ -107,7 +108,11 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
HoodieTableConfig tableConfig,
long start,
long length,
- boolean shouldUseRecordPosition) {
+ boolean shouldUseRecordPosition,
+ long maxMemorySizeInBytes,
+ String spillableMapBasePath,
+ ExternalSpillableMap.DiskMapType diskMapType,
+ boolean isBitCaskDiskMapCompressionEnabled) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -135,10 +140,10 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
: shouldUseRecordPosition
? new HoodiePositionBasedFileGroupRecordBuffer<>(
readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
- recordMerger, props)
+ recordMerger, props, maxMemorySizeInBytes, spillableMapBasePath,
diskMapType, isBitCaskDiskMapCompressionEnabled)
: new HoodieKeyBasedFileGroupRecordBuffer<>(
readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
- recordMerger, props);
+ recordMerger, props, maxMemorySizeInBytes, spillableMapBasePath,
diskMapType, isBitCaskDiskMapCompressionEnabled);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index 0bf27cfc71e..ccc001e79c9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -28,6 +28,7 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
@@ -58,7 +59,7 @@ public interface HoodieFileGroupRecordBuffer<T> {
* @param metadata
* @throws Exception
*/
- void processNextDataRecord(T record, Map<String, Object> metadata, Object
index) throws IOException;
+ void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable index) throws IOException;
/**
* Process a log delete block, and store the resulting records into the
buffer.
@@ -73,7 +74,7 @@ public interface HoodieFileGroupRecordBuffer<T> {
*
* @param deleteRecord
*/
- void processNextDeletedRecord(DeleteRecord deleteRecord, Object index);
+ void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable index);
/**
* Check if a record exists in the buffered records.
@@ -93,7 +94,7 @@ public interface HoodieFileGroupRecordBuffer<T> {
/**
* @return The underlying data stored in the buffer.
*/
- Map<Object, Pair<Option<T>, Map<String, Object>>> getLogRecords();
+ Map<Serializable, Pair<Option<T>, Map<String, Object>>> getLogRecords();
/**
* Link the base file iterator for consequential merge.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 2ab6048031f..b4e32be8c65 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -29,11 +29,13 @@ import
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -52,9 +54,13 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger recordMerger,
- TypedProperties payloadProps) {
+ TypedProperties payloadProps,
+ long maxMemorySizeInBytes,
+ String spillableMapBasePath,
+ ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isBitCaskDiskMapCompressionEnabled) {
super(readerContext, readerSchema, baseFileSchema,
partitionNameOverrideOpt, partitionPathFieldOpt,
- recordMerger, payloadProps);
+ recordMerger, payloadProps, maxMemorySizeInBytes,
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
}
@Override
@@ -84,7 +90,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
}
@Override
- public void processNextDataRecord(T record, Map<String, Object> metadata,
Object recordKey) throws IOException {
+ public void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable recordKey) throws IOException {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
@@ -96,7 +102,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
}
@Override
- public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws
IOException {
+ public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {
Iterator<DeleteRecord> it =
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
while (it.hasNext()) {
DeleteRecord record = it.next();
@@ -106,7 +112,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
}
@Override
- public void processNextDeletedRecord(DeleteRecord deleteRecord, Object
recordKey) {
+ public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
recordKey) {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecordMetadataPair);
if (recordOpt.isPresent()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index de63b0fb2e3..4412713928f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -29,11 +29,13 @@ import
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -59,9 +61,13 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger
recordMerger,
- TypedProperties
payloadProps) {
+ TypedProperties payloadProps,
+ long maxMemorySizeInBytes,
+ String spillableMapBasePath,
+
ExternalSpillableMap.DiskMapType diskMapType,
+ boolean
isBitCaskDiskMapCompressionEnabled) {
super(readerContext, readerSchema, baseFileSchema,
partitionNameOverrideOpt, partitionPathFieldOpt,
- recordMerger, payloadProps);
+ recordMerger, payloadProps, maxMemorySizeInBytes,
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
}
@Override
@@ -114,7 +120,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
}
@Override
- public void processNextDataRecord(T record, Map<String, Object> metadata,
Object recordPosition) throws IOException {
+ public void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable recordPosition) throws IOException {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
@@ -146,7 +152,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
}
@Override
- public void processNextDeletedRecord(DeleteRecord deleteRecord, Object
recordPosition) {
+ public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
recordPosition) {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecordMetadataPair);
if (recordOpt.isPresent()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
index 8c2514aa51f..f1e865aa281 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.util;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.avro.Schema;
@@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory;
*
* @param <T>
*/
-public class HoodieRecordSizeEstimator<T> implements
SizeEstimator<HoodieRecord<T>> {
+public class HoodieRecordSizeEstimator<T> implements SizeEstimator<T> {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieRecordSizeEstimator.class);
@@ -41,7 +40,7 @@ public class HoodieRecordSizeEstimator<T> implements
SizeEstimator<HoodieRecord<
}
@Override
- public long sizeEstimate(HoodieRecord<T> hoodieRecord) {
+ public long sizeEstimate(T hoodieRecord) {
// Most HoodieRecords are bound to have data + schema. Although, the same
schema object is shared amongst
// all records in the JVM. Calculate and print the size of the Schema and
of the Record to
// note the sizes and differences. A correct estimation in such cases is
handled in
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 59c078e929b..5fc1895ef78 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -34,6 +34,7 @@ import
org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -179,6 +180,10 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
metaClient.getTableConfig(),
0,
fileSlice.getTotalFileSize(),
+ false,
+ 1024 * 1024 * 1000,
+ metaClient.getTempFolderPath(),
+ ExternalSpillableMap.DiskMapType.ROCKS_DB,
false);
fileGroupReader.initRecordIterators();
while (fileGroupReader.hasNext()) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
index 39ddc42bc37..add4032b6c2 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
@@ -23,8 +23,10 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -121,7 +123,11 @@ public class HoodieFileGroupReaderTestUtils {
tableConfig,
start,
length,
- shouldUseRecordPosition);
+ shouldUseRecordPosition,
+ 1024 * 1024 * 1000,
+ basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME,
+ ExternalSpillableMap.DiskMapType.ROCKS_DB,
+ false);
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index f9c3d008fd0..3de30c770c7 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -22,13 +22,17 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig,
HoodieStorageConfig, TypedProperties}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.FileIOUtils
+import org.apache.hudi.common.util.collection.ExternalSpillableMap
+import org.apache.hudi.common.util.collection.ExternalSpillableMap.DiskMapType
+import org.apache.hudi.config.HoodieWriteConfig
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -41,6 +45,7 @@ import org.apache.spark.sql.types.{LongType, Metadata,
MetadataBuilder, StringTy
import org.apache.spark.util.SerializableConfiguration
import java.io.Closeable
+import java.util.Locale
import scala.annotation.tailrec
import scala.collection.mutable
import scala.jdk.CollectionConverters.asScalaIteratorConverter
@@ -118,7 +123,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val broadcastedHadoopConf = spark.sparkContext.broadcast(new
SerializableConfiguration(augmentedHadoopConf))
val broadcastedDataSchema = spark.sparkContext.broadcast(dataAvroSchema)
val broadcastedRequestedSchema =
spark.sparkContext.broadcast(requestedAvroSchema)
- val props: TypedProperties = HoodieFileIndex.getConfigProperties(spark,
options)
+ val fileIndexProps: TypedProperties =
HoodieFileIndex.getConfigProperties(spark, options)
(file: PartitionedFile) => {
file.partitionValues match {
@@ -153,7 +158,11 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
metaClient.getTableConfig,
file.start,
file.length,
- shouldUseRecordPosition)
+ shouldUseRecordPosition,
+
options.getOrElse(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue() + "").toLong,
+
options.getOrElse(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
FileIOUtils.getDefaultSpillableMapBasePath),
+
DiskMapType.valueOf(options.getOrElse(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT)),
+
options.getOrElse(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().toString).toBoolean)
reader.initRecordIterators()
// Append partition values to rows and project to output schema
appendPartitionAndProject(
@@ -172,7 +181,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val fileSplits =
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
val fileGroupSplit: HoodieCDCFileGroupSplit =
HoodieCDCFileGroupSplit(fileSplits)
buildCDCRecordIterator(
- fileGroupSplit, cdcFileReader, broadcastedHadoopConf.value.value,
props, requiredSchema)
+ fileGroupSplit, cdcFileReader, broadcastedHadoopConf.value.value,
fileIndexProps, requiredSchema)
// TODO: Use FileGroupReader here: HUDI-6942.
case _ => baseFileReader(file)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index 682558f79f9..da410f1f4a7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -35,6 +35,7 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -103,7 +104,11 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
partitionNameOpt,
partitionFields,
useCustomMerger ? new CustomMerger() : new HoodieSparkRecordMerger(),
- new TypedProperties());
+ new TypedProperties(),
+ 1024 * 1024 * 1000,
+ metaClient.getTempFolderPath(),
+ ExternalSpillableMap.DiskMapType.ROCKS_DB,
+ false);
}
public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {