This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf1df335442 [HUDI-7876] use properties to store log spill map configs 
for fg reader (#11455)
bf1df335442 is described below

commit bf1df335442d38932cf7f8c6ef4228c316278569
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Jun 18 12:30:56 2024 -0400

    [HUDI-7876] use properties to store log spill map configs for fg reader 
(#11455)
    
    * use properties to store log spill map configs for fg reader
    
    * use constant for the max buffer size
    
    * rename payloadProps to props
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../read/HoodieBaseFileGroupRecordBuffer.java      | 46 +++++++++++++---------
 .../common/table/read/HoodieFileGroupReader.java   | 13 ++----
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  | 10 +----
 .../HoodiePositionBasedFileGroupRecordBuffer.java  | 10 +----
 .../read/HoodieUnmergedFileGroupRecordBuffer.java  | 10 +----
 .../table/read/TestHoodieFileGroupReaderBase.java  |  9 +++--
 .../reader/HoodieFileGroupReaderTestUtils.java     | 12 +++---
 .../HoodieFileGroupReaderBasedRecordReader.java    | 24 +++++------
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 15 +++----
 ...stHoodiePositionBasedFileGroupRecordBuffer.java | 11 +++---
 10 files changed, 68 insertions(+), 92 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 88ec42673ac..aea50e44fbe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
@@ -50,9 +51,14 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
 
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 import static 
org.apache.hudi.common.table.read.HoodieFileGroupReader.getRecordMergeMode;
@@ -64,7 +70,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected final Option<String[]> partitionPathFieldOpt;
   protected final RecordMergeMode recordMergeMode;
   protected final HoodieRecordMerger recordMerger;
-  protected final TypedProperties payloadProps;
+  protected final TypedProperties props;
   protected final ExternalSpillableMap<Serializable, Pair<Option<T>, 
Map<String, Object>>> records;
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
@@ -78,24 +84,26 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                                          Option<String> 
partitionNameOverrideOpt,
                                          Option<String[]> 
partitionPathFieldOpt,
                                          HoodieRecordMerger recordMerger,
-                                         TypedProperties payloadProps,
-                                         long maxMemorySizeInBytes,
-                                         String spillableMapBasePath,
-                                         ExternalSpillableMap.DiskMapType 
diskMapType,
-                                         boolean 
isBitCaskDiskMapCompressionEnabled) {
+                                         TypedProperties props) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
     this.partitionNameOverrideOpt = partitionNameOverrideOpt;
     this.partitionPathFieldOpt = partitionPathFieldOpt;
-    this.recordMergeMode = getRecordMergeMode(payloadProps);
+    this.recordMergeMode = getRecordMergeMode(props);
     this.recordMerger = recordMerger;
     //Custom merge mode should produce the same results for any merger so we 
won't fail if there is a mismatch
     if (recordMerger.getRecordMergeMode() != this.recordMergeMode && 
this.recordMergeMode != RecordMergeMode.CUSTOM) {
       throw new IllegalStateException("Record merger is " + 
recordMerger.getClass().getName() + " but merge mode is " + 
this.recordMergeMode);
     }
-    this.payloadProps = payloadProps;
+    this.props = props;
     this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
     this.hoodieTableMetaClient = hoodieTableMetaClient;
+    long maxMemorySizeInBytes = props.getLong(MAX_MEMORY_FOR_MERGE.key(), 
MAX_MEMORY_FOR_MERGE.defaultValue());
+    String spillableMapBasePath = 
props.getString(SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath());
+    ExternalSpillableMap.DiskMapType diskMapType = 
ExternalSpillableMap.DiskMapType.valueOf(props.getString(SPILLABLE_DISK_MAP_TYPE.key(),
+        
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
+    boolean isBitCaskDiskMapCompressionEnabled = 
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+        DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
     try {
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -213,7 +221,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                 existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
             (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
             readerSchema,
-            payloadProps);
+            props);
         if (!combinedRecordAndSchemaOpt.isPresent()) {
           return Option.empty();
         }
@@ -233,12 +241,12 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             return Option.empty();
           case EVENT_TIME_ORDERING:
             Comparable existingOrderingValue = readerContext.getOrderingValue(
-                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema, payloadProps);
+                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema, props);
             if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingValue)) {
               return Option.empty();
             }
             Comparable incomingOrderingValue = readerContext.getOrderingValue(
-                Option.of(record), metadata, readerSchema, payloadProps);
+                Option.of(record), metadata, readerSchema, props);
             if (compareTo(readerContext, incomingOrderingValue, 
existingOrderingValue) > 0) {
               return Option.of(Pair.of(record, metadata));
             }
@@ -254,7 +262,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                 readerContext.constructHoodieRecord(
                     existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
                 (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
-                payloadProps);
+                props);
 
             if (!combinedRecordAndSchemaOpt.isPresent()) {
               return Option.empty();
@@ -297,7 +305,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
         default:
           Comparable existingOrderingVal = readerContext.getOrderingValue(
               existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema,
-              payloadProps);
+              props);
           if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingVal)) {
             return Option.empty();
           }
@@ -383,10 +391,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
       Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.partialMerge(
           readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA),
           readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA),
-          readerSchema, payloadProps);
+          readerSchema, props);
 
       if (mergedRecord.isPresent()
-          && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), 
payloadProps)) {
+          && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
         if (!mergedRecord.get().getRight().equals(readerSchema)) {
           return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
         }
@@ -399,12 +407,12 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
           return newer;
         case EVENT_TIME_ORDERING:
           Comparable oldOrderingValue = readerContext.getOrderingValue(
-              older, olderInfoMap, readerSchema, payloadProps);
+              older, olderInfoMap, readerSchema, props);
           if (isDeleteRecordWithNaturalOrder(older, oldOrderingValue)) {
             return newer;
           }
           Comparable newOrderingValue = readerContext.getOrderingValue(
-              newer, newerInfoMap, readerSchema, payloadProps);
+              newer, newerInfoMap, readerSchema, props);
           if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
             return Option.empty();
           }
@@ -416,10 +424,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
         default:
           Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
               readerContext.constructHoodieRecord(older, olderInfoMap), 
(Schema) olderInfoMap.get(INTERNAL_META_SCHEMA),
-              readerContext.constructHoodieRecord(newer, newerInfoMap), 
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
+              readerContext.constructHoodieRecord(newer, newerInfoMap), 
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), props);
 
           if (mergedRecord.isPresent()
-              && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), 
payloadProps)) {
+              && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
             if (!mergedRecord.get().getRight().equals(readerSchema)) {
               return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
             }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 97d655a9e47..1c59e8f0ba2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -36,7 +36,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.CachingIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
@@ -94,11 +93,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                TypedProperties props,
                                long start,
                                long length,
-                               boolean shouldUseRecordPosition,
-                               long maxMemorySizeInBytes,
-                               String spillableMapBasePath,
-                               ExternalSpillableMap.DiskMapType diskMapType,
-                               boolean isBitCaskDiskMapCompressionEnabled) {
+                               boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.storage = storage;
     this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -122,10 +117,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.recordBuffer = this.logFiles.isEmpty()
         ? null
         : shouldUseRecordPosition
-        ? new HoodiePositionBasedFileGroupRecordBuffer<>(readerContext, 
hoodieTableMetaClient, Option.empty(),
-        Option.empty(), recordMerger, props, maxMemorySizeInBytes, 
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled)
-        : new HoodieKeyBasedFileGroupRecordBuffer<>(readerContext, 
hoodieTableMetaClient, Option.empty(),
-        Option.empty(), recordMerger, props, maxMemorySizeInBytes, 
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
+        ? new HoodiePositionBasedFileGroupRecordBuffer<>(readerContext, 
hoodieTableMetaClient, Option.empty(), Option.empty(), recordMerger, props)
+        : new HoodieKeyBasedFileGroupRecordBuffer<>(readerContext, 
hoodieTableMetaClient, Option.empty(), Option.empty(), recordMerger, props);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index d311923c625..1c53bfedf3d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -30,7 +30,6 @@ import 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.Schema;
@@ -54,13 +53,8 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
                                              Option<String> 
partitionNameOverrideOpt,
                                              Option<String[]> 
partitionPathFieldOpt,
                                              HoodieRecordMerger recordMerger,
-                                             TypedProperties payloadProps,
-                                             long maxMemorySizeInBytes,
-                                             String spillableMapBasePath,
-                                             ExternalSpillableMap.DiskMapType 
diskMapType,
-                                             boolean 
isBitCaskDiskMapCompressionEnabled) {
-    super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt, 
partitionPathFieldOpt,
-        recordMerger, payloadProps, maxMemorySizeInBytes, 
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
+                                             TypedProperties props) {
+    super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt, 
partitionPathFieldOpt, recordMerger, props);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 79011a81d6b..03ee23b91d5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -30,7 +30,6 @@ import 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieKeyException;
 
@@ -72,13 +71,8 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
                                                   Option<String> 
partitionNameOverrideOpt,
                                                   Option<String[]> 
partitionPathFieldOpt,
                                                   HoodieRecordMerger 
recordMerger,
-                                                  TypedProperties payloadProps,
-                                                  long maxMemorySizeInBytes,
-                                                  String spillableMapBasePath,
-                                                  
ExternalSpillableMap.DiskMapType diskMapType,
-                                                  boolean 
isBitCaskDiskMapCompressionEnabled) {
-    super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt, 
partitionPathFieldOpt,
-        recordMerger, payloadProps, maxMemorySizeInBytes, 
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
+                                                  TypedProperties props) {
+    super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt, 
partitionPathFieldOpt, recordMerger, props);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
index 8338c96f5a0..f766645999d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
@@ -30,7 +30,6 @@ import 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 
@@ -53,13 +52,8 @@ public class HoodieUnmergedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
       Option<String> partitionNameOverrideOpt,
       Option<String[]> partitionPathFieldOpt,
       HoodieRecordMerger recordMerger,
-      TypedProperties payloadProps,
-      long maxMemorySizeInBytes,
-      String spillableMapBasePath,
-      ExternalSpillableMap.DiskMapType diskMapType,
-      boolean isBitCaskDiskMapCompressionEnabled) {
-    super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt, 
partitionPathFieldOpt, recordMerger,
-        payloadProps, maxMemorySizeInBytes, spillableMapBasePath, diskMapType, 
isBitCaskDiskMapCompressionEnabled);
+      TypedProperties props) {
+    super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt, 
partitionPathFieldOpt, recordMerger, props);
   }
 
   @Override
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index b1bb2dfe545..6c7b3400427 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
@@ -276,6 +277,10 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     props.setProperty("hoodie.payload.ordering.field", "timestamp");
     props.setProperty(RECORD_MERGER_STRATEGY.key(), 
RECORD_MERGER_STRATEGY.defaultValue());
     props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+    props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
+    props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
metaClient.getTempFolderPath());
+    props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
+    
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 "false");
     if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
       props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS));
     }
@@ -293,10 +298,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         props,
         0,
         fileSlice.getTotalFileSize(),
-        false,
-        1024 * 1024 * 1000,
-        metaClient.getTempFolderPath(),
-        ExternalSpillableMap.DiskMapType.ROCKS_DB,
         false);
     fileGroupReader.initRecordIterators();
     while (fileGroupReader.hasNext()) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
index 998bd4ab079..e3faae07fa0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.common.testutils.reader;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.FileSlice;
@@ -102,6 +104,10 @@ public class HoodieFileGroupReaderTestUtils {
         boolean shouldUseRecordPosition,
         HoodieTableMetaClient metaClient
     ) {
+      
props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),String.valueOf(1024
 * 1024 * 1000));
+      props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),  
basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME);
+      props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
+      
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 "false");
       return new HoodieFileGroupReader<>(
           readerContext,
           storage,
@@ -115,11 +121,7 @@ public class HoodieFileGroupReaderTestUtils {
           props,
           start,
           length,
-          shouldUseRecordPosition,
-          1024 * 1024 * 1000,
-          basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME,
-          ExternalSpillableMap.DiskMapType.ROCKS_DB,
-          false);
+          shouldUseRecordPosition);
     }
   }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index acd499a8c60..e26e4b0efa9 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -28,10 +29,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.hadoop.realtime.RealtimeSplit;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
@@ -64,10 +63,6 @@ import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
-import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
 import static org.apache.hudi.common.fs.FSUtils.getCommitTime;
 import static org.apache.hudi.common.fs.FSUtils.getFileId;
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
@@ -125,18 +120,17 @@ public class HoodieFileGroupReaderBasedRecordReader 
implements RecordReader<Null
     Map<String, String[]> hosts = new HashMap<>();
     this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, hosts, metaClient);
     this.arrayWritable = new ArrayWritable(Writable.class, new 
Writable[requestedSchema.getFields().size()]);
-    // get some config values
-    long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(), 
MAX_MEMORY_FOR_MERGE.defaultValue());
-    String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath());
-    ExternalSpillableMap.DiskMapType spillMapType = 
ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(),
-        
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
-    boolean bitmaskCompressEnabled = 
jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-        DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+    TypedProperties props = metaClient.getTableConfig().getProps();
+    jobConf.forEach(e -> {
+      if (e.getKey().startsWith("hoodie")) {
+        props.setProperty(e.getKey(), e.getValue());
+      }
+    });
     LOG.debug("Creating HoodieFileGroupReaderRecordReader with 
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath, 
latestCommitTime, fileSplit.getPath());
     this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, 
metaClient.getStorage(), tableBasePath,
         latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, 
getFs(tableBasePath, jobConfCopy), tableBasePath),
-        tableSchema, requestedSchema, Option.empty(), metaClient, 
metaClient.getTableConfig().getProps(), fileSplit.getStart(),
-        fileSplit.getLength(), false, maxMemoryForMerge, spillableMapPath, 
spillMapType, bitmaskCompressEnabled);
+        tableSchema, requestedSchema, Option.empty(), metaClient, props, 
fileSplit.getStart(),
+        fileSplit.getLength(), false);
     this.fileGroupReader.initRecordIterators();
     // it expects the partition columns to be at the end
     Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 318aa286525..71dbf9ae017 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -23,12 +23,10 @@ import 
org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
 import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, 
TypedProperties}
+import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.FileIOUtils
-import org.apache.hudi.common.util.collection.ExternalSpillableMap.DiskMapType
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.hudi.storage.StorageConfiguration
@@ -44,7 +42,6 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 import java.io.Closeable
-import java.util.Locale
 
 trait HoodieFormatTrait {
 
@@ -153,6 +150,8 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                 val storageConf = broadcastedStorageConf.value
                 val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
                   
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
+                val props = metaClient.getTableConfig.getProps
+                options.foreach(kv => props.setProperty(kv._1, kv._2))
                 val reader = new HoodieFileGroupReader[InternalRow](
                   readerContext,
                   new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
@@ -163,14 +162,10 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                   broadcastedRequestedSchema.value,
                   internalSchemaOpt,
                   metaClient,
-                  metaClient.getTableConfig.getProps,
+                  props,
                   file.start,
                   file.length,
-                  shouldUseRecordPosition,
-                  
options.getOrElse(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue() + "").toLong,
-                  
options.getOrElse(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath),
-                  
DiskMapType.valueOf(options.getOrElse(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
 
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT)),
-                  
options.getOrElse(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().toString).toBoolean)
+                  shouldUseRecordPosition)
                 reader.initRecordIterators()
                 // Append partition values to rows and project to output schema
                 appendPartitionAndProject(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index 62a84bb6f8b..edcc1f6dad2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -20,6 +20,7 @@
 package org.apache.hudi;
 
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
@@ -125,17 +126,17 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
         Option.empty(), metaClient.getTableConfig()));
     TypedProperties props = new TypedProperties();
     props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(), mergeMode.name());
+    
props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
+    props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
metaClient.getTempFolderPath());
+    props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
+    
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 "false");
     buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
         ctx,
         metaClient,
         partitionNameOpt,
         partitionFields,
         ctx.getRecordMerger(),
-        props,
-        1024 * 1024 * 1000,
-        metaClient.getTempFolderPath(),
-        ExternalSpillableMap.DiskMapType.ROCKS_DB,
-        false);
+        props);
   }
 
   public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {

Reply via email to