This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ed41e0f15e [HUDI-7340] Use spillable map for cached log records in 
HoodieBaseFileGroupRecordBuffer (#10588)
4ed41e0f15e is described below

commit 4ed41e0f15e65431799340bb655d28db92de34b9
Author: Danny Chan <[email protected]>
AuthorDate: Thu Feb 1 08:43:21 2024 +0800

    [HUDI-7340] Use spillable map for cached log records in 
HoodieBaseFileGroupRecordBuffer (#10588)
---
 .../table/log/HoodieMergedLogRecordReader.java     |  3 ++-
 .../read/HoodieBaseFileGroupRecordBuffer.java      | 27 ++++++++++++++++------
 .../common/table/read/HoodieFileGroupReader.java   | 11 ++++++---
 .../table/read/HoodieFileGroupRecordBuffer.java    |  7 +++---
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  | 16 +++++++++----
 .../HoodiePositionBasedFileGroupRecordBuffer.java  | 14 +++++++----
 .../common/util/HoodieRecordSizeEstimator.java     |  5 ++--
 .../table/read/TestHoodieFileGroupReaderBase.java  |  5 ++++
 .../reader/HoodieFileGroupReaderTestUtils.java     |  8 ++++++-
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 17 ++++++++++----
 ...stHoodiePositionBasedFileGroupRecordBuffer.java |  7 +++++-
 11 files changed, 88 insertions(+), 32 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 44c4c973eae..6b31c200907 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -183,7 +184,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     return recordBuffer.getLogRecordIterator();
   }
 
-  public Map<Object, Pair<Option<T>, Map<String, Object>>> getRecords() {
+  public Map<Serializable, Pair<Option<T>, Map<String, Object>>> getRecords() {
     return recordBuffer.getLogRecords();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 2f695cf0249..70ddb5abff2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -27,11 +27,15 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.exception.HoodieValidationException;
 
@@ -39,8 +43,8 @@ import org.apache.avro.Schema;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +60,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected final Option<String[]> partitionPathFieldOpt;
   protected final HoodieRecordMerger recordMerger;
   protected final TypedProperties payloadProps;
-  protected final Map<Object, Pair<Option<T>, Map<String, Object>>> records;
+  protected final ExternalSpillableMap<Serializable, Pair<Option<T>, 
Map<String, Object>>> records;
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
   protected T nextRecord;
@@ -68,7 +72,11 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                                          Option<String> 
partitionNameOverrideOpt,
                                          Option<String[]> 
partitionPathFieldOpt,
                                          HoodieRecordMerger recordMerger,
-                                         TypedProperties payloadProps) {
+                                         TypedProperties payloadProps,
+                                         long maxMemorySizeInBytes,
+                                         String spillableMapBasePath,
+                                         ExternalSpillableMap.DiskMapType 
diskMapType,
+                                         boolean 
isBitCaskDiskMapCompressionEnabled) {
     this.readerContext = readerContext;
     this.readerSchema = readerSchema;
     this.baseFileSchema = baseFileSchema;
@@ -76,7 +84,13 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     this.partitionPathFieldOpt = partitionPathFieldOpt;
     this.recordMerger = recordMerger;
     this.payloadProps = payloadProps;
-    this.records = new HashMap<>();
+    try {
+      // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
+      this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator<>(),
+          new HoodieRecordSizeEstimator<>(readerSchema), diskMapType, 
isBitCaskDiskMapCompressionEnabled);
+    } catch (IOException e) {
+      throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
+    }
   }
 
   @Override
@@ -104,7 +118,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   }
 
   @Override
-  public Map<Object, Pair<Option<T>, Map<String, Object>>> getLogRecords() {
+  public Map<Serializable, Pair<Option<T>, Map<String, Object>>> 
getLogRecords() {
     return records;
   }
 
@@ -220,8 +234,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
    * @return
    * @throws IOException
    */
-  protected Pair<ClosableIterator<T>, Schema> getRecordsIterator(
-      HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws 
IOException {
+  protected Pair<ClosableIterator<T>, Schema> 
getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) {
     ClosableIterator<T> blockRecordsIterator;
     if (keySpecOpt.isPresent()) {
       KeySpec keySpec = keySpecOpt.get();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 8edf5d7130e..3984177bdbf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.CachingIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -107,7 +108,11 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                HoodieTableConfig tableConfig,
                                long start,
                                long length,
-                               boolean shouldUseRecordPosition) {
+                               boolean shouldUseRecordPosition,
+                               long maxMemorySizeInBytes,
+                               String spillableMapBasePath,
+                               ExternalSpillableMap.DiskMapType diskMapType,
+                               boolean isBitCaskDiskMapCompressionEnabled) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
     this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -135,10 +140,10 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
         readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
-        recordMerger, props)
+        recordMerger, props, maxMemorySizeInBytes, spillableMapBasePath, 
diskMapType, isBitCaskDiskMapCompressionEnabled)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
         readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
-        recordMerger, props);
+        recordMerger, props, maxMemorySizeInBytes, spillableMapBasePath, 
diskMapType, isBitCaskDiskMapCompressionEnabled);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index 0bf27cfc71e..ccc001e79c9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -58,7 +59,7 @@ public interface HoodieFileGroupRecordBuffer<T> {
    * @param metadata
    * @throws Exception
    */
-  void processNextDataRecord(T record, Map<String, Object> metadata, Object 
index) throws IOException;
+  void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable index) throws IOException;
 
   /**
    * Process a log delete block, and store the resulting records into the 
buffer.
@@ -73,7 +74,7 @@ public interface HoodieFileGroupRecordBuffer<T> {
    *
    * @param deleteRecord
    */
-  void processNextDeletedRecord(DeleteRecord deleteRecord, Object index);
+  void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable index);
 
   /**
    * Check if a record exists in the buffered records.
@@ -93,7 +94,7 @@ public interface HoodieFileGroupRecordBuffer<T> {
   /**
    * @return The underlying data stored in the buffer.
    */
-  Map<Object, Pair<Option<T>, Map<String, Object>>> getLogRecords();
+  Map<Serializable, Pair<Option<T>, Map<String, Object>>> getLogRecords();
 
   /**
    * Link the base file iterator for consequential merge.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 2ab6048031f..b4e32be8c65 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -29,11 +29,13 @@ import 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.Schema;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -52,9 +54,13 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
                                              Option<String> 
partitionNameOverrideOpt,
                                              Option<String[]> 
partitionPathFieldOpt,
                                              HoodieRecordMerger recordMerger,
-                                             TypedProperties payloadProps) {
+                                             TypedProperties payloadProps,
+                                             long maxMemorySizeInBytes,
+                                             String spillableMapBasePath,
+                                             ExternalSpillableMap.DiskMapType 
diskMapType,
+                                             boolean 
isBitCaskDiskMapCompressionEnabled) {
     super(readerContext, readerSchema, baseFileSchema, 
partitionNameOverrideOpt, partitionPathFieldOpt,
-        recordMerger, payloadProps);
+        recordMerger, payloadProps, maxMemorySizeInBytes, 
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
   }
 
   @Override
@@ -84,7 +90,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
   }
 
   @Override
-  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Object recordKey) throws IOException {
+  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable recordKey) throws IOException {
     Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
     Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
         doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
@@ -96,7 +102,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
   }
 
   @Override
-  public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws 
IOException {
+  public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {
     Iterator<DeleteRecord> it = 
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
     while (it.hasNext()) {
       DeleteRecord record = it.next();
@@ -106,7 +112,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
   }
 
   @Override
-  public void processNextDeletedRecord(DeleteRecord deleteRecord, Object 
recordKey) {
+  public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable 
recordKey) {
     Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
     Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, 
existingRecordMetadataPair);
     if (recordOpt.isPresent()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index de63b0fb2e3..4412713928f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -29,11 +29,13 @@ import 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.Schema;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -59,9 +61,13 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
                                                   Option<String> 
partitionNameOverrideOpt,
                                                   Option<String[]> 
partitionPathFieldOpt,
                                                   HoodieRecordMerger 
recordMerger,
-                                                  TypedProperties 
payloadProps) {
+                                                  TypedProperties payloadProps,
+                                                  long maxMemorySizeInBytes,
+                                                  String spillableMapBasePath,
+                                                  
ExternalSpillableMap.DiskMapType diskMapType,
+                                                  boolean 
isBitCaskDiskMapCompressionEnabled) {
     super(readerContext, readerSchema, baseFileSchema, 
partitionNameOverrideOpt, partitionPathFieldOpt,
-        recordMerger, payloadProps);
+        recordMerger, payloadProps, maxMemorySizeInBytes, 
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
   }
 
   @Override
@@ -114,7 +120,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
   }
 
   @Override
-  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Object recordPosition) throws IOException {
+  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable recordPosition) throws IOException {
     Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordPosition);
     Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
         doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
@@ -146,7 +152,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
   }
 
   @Override
-  public void processNextDeletedRecord(DeleteRecord deleteRecord, Object 
recordPosition) {
+  public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable 
recordPosition) {
     Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordPosition);
     Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, 
existingRecordMetadataPair);
     if (recordOpt.isPresent()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
index 8c2514aa51f..f1e865aa281 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.util;
 
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 
 import org.apache.avro.Schema;
@@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory;
  * 
  * @param <T>
  */
-public class HoodieRecordSizeEstimator<T> implements 
SizeEstimator<HoodieRecord<T>> {
+public class HoodieRecordSizeEstimator<T> implements SizeEstimator<T> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordSizeEstimator.class);
 
@@ -41,7 +40,7 @@ public class HoodieRecordSizeEstimator<T> implements 
SizeEstimator<HoodieRecord<
   }
 
   @Override
-  public long sizeEstimate(HoodieRecord<T> hoodieRecord) {
+  public long sizeEstimate(T hoodieRecord) {
     // Most HoodieRecords are bound to have data + schema. Although, the same 
schema object is shared amongst
     // all records in the JVM. Calculate and print the size of the Schema and 
of the Record to
     // note the sizes and differences. A correct estimation in such cases is 
handled in
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 59c078e929b..5fc1895ef78 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -34,6 +34,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 
@@ -179,6 +180,10 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         metaClient.getTableConfig(),
         0,
         fileSlice.getTotalFileSize(),
+        false,
+        1024 * 1024 * 1000,
+        metaClient.getTempFolderPath(),
+        ExternalSpillableMap.DiskMapType.ROCKS_DB,
         false);
     fileGroupReader.initRecordIterators();
     while (fileGroupReader.hasNext()) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
index 39ddc42bc37..add4032b6c2 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
@@ -23,8 +23,10 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
@@ -121,7 +123,11 @@ public class HoodieFileGroupReaderTestUtils {
           tableConfig,
           start,
           length,
-          shouldUseRecordPosition);
+          shouldUseRecordPosition,
+          1024 * 1024 * 1000,
+          basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME,
+          ExternalSpillableMap.DiskMapType.ROCKS_DB,
+          false);
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index f9c3d008fd0..3de30c770c7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -22,13 +22,17 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
 import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, 
HoodieStorageConfig, TypedProperties}
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.FileIOUtils
+import org.apache.hudi.common.util.collection.ExternalSpillableMap
+import org.apache.hudi.common.util.collection.ExternalSpillableMap.DiskMapType
+import org.apache.hudi.config.HoodieWriteConfig
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -41,6 +45,7 @@ import org.apache.spark.sql.types.{LongType, Metadata, 
MetadataBuilder, StringTy
 import org.apache.spark.util.SerializableConfiguration
 
 import java.io.Closeable
+import java.util.Locale
 import scala.annotation.tailrec
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.asScalaIteratorConverter
@@ -118,7 +123,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     val broadcastedHadoopConf = spark.sparkContext.broadcast(new 
SerializableConfiguration(augmentedHadoopConf))
     val broadcastedDataSchema =  spark.sparkContext.broadcast(dataAvroSchema)
     val broadcastedRequestedSchema =  
spark.sparkContext.broadcast(requestedAvroSchema)
-    val props: TypedProperties = HoodieFileIndex.getConfigProperties(spark, 
options)
+    val fileIndexProps: TypedProperties = 
HoodieFileIndex.getConfigProperties(spark, options)
 
     (file: PartitionedFile) => {
       file.partitionValues match {
@@ -153,7 +158,11 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                   metaClient.getTableConfig,
                   file.start,
                   file.length,
-                  shouldUseRecordPosition)
+                  shouldUseRecordPosition,
+                  
options.getOrElse(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue() + "").toLong,
+                  
options.getOrElse(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath),
+                  
DiskMapType.valueOf(options.getOrElse(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
 
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT)),
+                  
options.getOrElse(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().toString).toBoolean)
                 reader.initRecordIterators()
                 // Append partition values to rows and project to output schema
                 appendPartitionAndProject(
@@ -172,7 +181,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
           val fileSplits = 
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
           val fileGroupSplit: HoodieCDCFileGroupSplit = 
HoodieCDCFileGroupSplit(fileSplits)
           buildCDCRecordIterator(
-            fileGroupSplit, cdcFileReader, broadcastedHadoopConf.value.value, 
props, requiredSchema)
+            fileGroupSplit, cdcFileReader, broadcastedHadoopConf.value.value, 
fileIndexProps, requiredSchema)
         // TODO: Use FileGroupReader here: HUDI-6942.
         case _ => baseFileReader(file)
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index 682558f79f9..da410f1f4a7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -35,6 +35,7 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -103,7 +104,11 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
         partitionNameOpt,
         partitionFields,
         useCustomMerger ? new CustomMerger() : new HoodieSparkRecordMerger(),
-        new TypedProperties());
+        new TypedProperties(),
+        1024 * 1024 * 1000,
+        metaClient.getTempFolderPath(),
+        ExternalSpillableMap.DiskMapType.ROCKS_DB,
+        false);
   }
 
   public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {

Reply via email to