This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c9069fe57948 fix: Allow HFile Cache reader configuration to be 
configurable (#13927)
c9069fe57948 is described below

commit c9069fe5794840971c1710034b08a0f36f53518e
Author: voonhous <[email protected]>
AuthorDate: Tue Sep 30 00:40:39 2025 +0800

    fix: Allow HFile Cache reader configuration to be configurable (#13927)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    | 11 ++--
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  4 +-
 .../hudi/metadata/HoodieMetadataWriteUtils.java    |  8 +++
 .../hudi/table/action/compact/HoodieCompactor.java | 12 ++++-
 .../client/common/HoodieFlinkEngineContext.java    |  2 +-
 .../client/common/HoodieJavaEngineContext.java     |  2 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |  3 +-
 .../client/common/HoodieSparkEngineContext.java    |  2 +-
 .../apache/hudi/avro/HoodieAvroReaderContext.java  | 33 ++++++++----
 .../hudi/common/config/HoodieReaderConfig.java     |  6 ---
 .../common/engine/AvroReaderContextFactory.java    | 11 ++--
 .../hudi/common/engine/HoodieEngineContext.java    |  2 +-
 .../common/engine/HoodieLocalEngineContext.java    |  2 +-
 .../hudi/common/engine/HoodieReaderContext.java    | 18 ++++++-
 .../hudi/common/table/log/HoodieLogFileReader.java |  5 +-
 .../table/log/block/HoodieHFileDataBlock.java      | 25 +++------
 .../org/apache/hudi/common/util/ConfigUtils.java   | 60 ++++++++++++++++++++--
 .../hudi/io/storage/HoodieFileReaderFactory.java   |  4 --
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 60 ++++------------------
 .../testutils/reader/HoodieFileSliceTestUtils.java |  4 +-
 .../common/testutils/HoodieCommonTestHarness.java  |  3 +-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  3 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  6 +--
 .../hudi/functional/RecordLevelIndexTestBase.scala |  6 +--
 .../hudi/functional/TestRecordLevelIndex.scala     |  1 +
 .../utilities/HoodieMetadataTableValidator.java    |  7 +--
 26 files changed, 163 insertions(+), 137 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 9dea9b8fa2ba..4549050d9a3e 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -231,16 +231,17 @@ public class HoodieLogFileCommand {
     List<IndexedRecord> allRecords = new ArrayList<>();
     if (shouldMerge) {
       System.out.println("===========================> MERGING RECORDS 
<===================");
-      HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(
-          HoodieCLI.getTableMetaClient().getStorage().getConf(),
-          HoodieCLI.getTableMetaClient().getTableConfig(),
-          Option.empty(),
-          Option.empty());
       StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
       HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(),
 firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
       FileSlice fileSlice = new FileSlice(fileGroupId, 
HoodieTimeline.INIT_INSTANT_TS, null, logFilePaths.stream()
           .map(l -> new HoodieLogFile(new 
StoragePath(l))).collect(Collectors.toList()));
       TypedProperties fileGroupReaderProperties = 
buildFileGroupReaderProperties();
+      HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(
+          HoodieCLI.getTableMetaClient().getStorage().getConf(),
+          HoodieCLI.getTableMetaClient().getTableConfig(),
+          Option.empty(),
+          Option.empty(),
+          fileGroupReaderProperties);
       try (HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
           .withReaderContext(readerContext)
           .withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index eff2954ac05c..5b49cc4c8f8e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -20,7 +20,6 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
@@ -720,8 +719,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
         // Not supporting positions in HFile data blocks
         
header.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
         return new HoodieHFileDataBlock(
-            records, header, writeConfig.getHFileCompressionAlgorithm(), new 
StoragePath(writeConfig.getBasePath()),
-            
writeConfig.getBooleanOrDefault(HoodieReaderConfig.USE_NATIVE_HFILE_READER));
+            records, header, writeConfig.getHFileCompressionAlgorithm(), new 
StoragePath(writeConfig.getBasePath()));
       case PARQUET_DATA_BLOCK:
         return new HoodieParquetDataBlock(
             records,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 7895a5e4ced1..72594892d2d8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
@@ -199,6 +200,13 @@ public class HoodieMetadataWriteUtils {
       properties.put(HoodieMetricsConfig.METRICS_REPORTER_PREFIX.key(),
           writeConfig.getMetricReporterMetricsNamePrefix() + 
METADATA_TABLE_NAME_SUFFIX);
     }
+    // HFile caching properties
+    properties.put(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
+        
writeConfig.getBooleanOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+    properties.put(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
+        
writeConfig.getIntOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
+    properties.put(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
+        
writeConfig.getIntOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
     builder.withProperties(properties);
 
     if (writeConfig.isMetricsOn()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 6e72397d541a..932841c98d48 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -135,7 +136,13 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
               operation -> logCompact(config, operation, 
compactionInstantTime, instantRange, table, taskContextSupplier))
           .flatMap(List::iterator);
     } else {
-      ReaderContextFactory<T> readerContextFactory = 
context.getReaderContextFactory(metaClient);
+      ReaderContextFactory<T> readerContextFactory;
+      if (!metaClient.isMetadataTable()) {
+        readerContextFactory = context.getReaderContextFactory(metaClient);
+      } else {
+        // Payload and HFile caching props are required here
+        readerContextFactory = (ReaderContextFactory<T>) 
context.getReaderContextFactoryForWrite(metaClient, HoodieRecordType.AVRO, 
config.getProps());
+      }
       return context.parallelize(operations).map(
               operation -> compact(config, operation, compactionInstantTime, 
readerContextFactory.getContext(), table, maxInstantTime, taskContextSupplier))
           .flatMap(List::iterator);
@@ -164,7 +171,8 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
                                       Option<InstantRange> instantRange,
                                       HoodieTable table,
                                       TaskContextSupplier taskContextSupplier) 
throws IOException {
-    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(table.getStorageConf(), 
table.getMetaClient().getTableConfig(), instantRange, Option.empty());
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(
+        table.getStorageConf(), table.getMetaClient().getTableConfig(), 
instantRange, Option.empty(), writeConfig.getProps());
     FileGroupReaderBasedAppendHandle<IndexedRecord, ?, ?, ?> appendHandle = 
new FileGroupReaderBasedAppendHandle<>(writeConfig, instantTime, table, 
operation,  taskContextSupplier, readerContext);
     appendHandle.doAppend();
     return appendHandle.close();
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 8c45c9c35f9d..edbdc1ef0703 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -248,7 +248,7 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
   public ReaderContextFactory<?> getReaderContextFactory(HoodieTableMetaClient 
metaClient) {
     // metadata table reads are only supported by the AvroReaderContext.
     if (metaClient.isMetadataTable()) {
-      return new AvroReaderContextFactory(metaClient);
+      return new AvroReaderContextFactory(metaClient, new TypedProperties());
     }
     return getEngineReaderContextFactory(metaClient);
   }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index a96caf311328..18383a3e4688 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -199,7 +199,7 @@ public class HoodieJavaEngineContext extends 
HoodieEngineContext {
 
   @Override
   public ReaderContextFactory<IndexedRecord> 
getEngineReaderContextFactory(HoodieTableMetaClient metaClient) {
-    return new AvroReaderContextFactory(metaClient);
+    return new AvroReaderContextFactory(metaClient, new TypedProperties());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index cc2e5803a809..f263ab7a7827 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -949,7 +949,8 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     if (enableMetaFields) {
       schema = HoodieAvroUtils.addMetadataFields(schema);
     }
-    HoodieAvroReaderContext readerContext = new 
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(), 
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty());
+    HoodieAvroReaderContext readerContext = new 
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(), 
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty(),
+        new TypedProperties());
     HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metadataMetaClient)
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 09d371d63eae..fca42be1a97e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -260,7 +260,7 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
   public ReaderContextFactory<?> getReaderContextFactory(HoodieTableMetaClient 
metaClient) {
     // metadata table are only supported by the AvroReaderContext.
     if (metaClient.isMetadataTable()) {
-      return new AvroReaderContextFactory(metaClient);
+      return new AvroReaderContextFactory(metaClient, new TypedProperties());
     }
     return getEngineReaderContextFactory(metaClient);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index c659c6beecaf..5af1aadd4156 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -21,6 +21,7 @@ package org.apache.hudi.avro;
 
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -34,6 +35,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.BufferedRecordSerializer;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
@@ -77,12 +79,21 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
    * @param instantRangeOpt the set of valid instants for this read
    * @param filterOpt an optional filter to apply on the record keys
    */
+  public HoodieAvroReaderContext(
+      StorageConfiguration<?> storageConfiguration,
+      HoodieTableConfig tableConfig,
+      Option<InstantRange> instantRangeOpt,
+      Option<Predicate> filterOpt,
+      TypedProperties props) {
+    this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, 
Collections.emptyMap(), tableConfig.getPayloadClass(), new HoodieConfig(props));
+  }
+
   public HoodieAvroReaderContext(
       StorageConfiguration<?> storageConfiguration,
       HoodieTableConfig tableConfig,
       Option<InstantRange> instantRangeOpt,
       Option<Predicate> filterOpt) {
-    this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, 
Collections.emptyMap());
+    this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, 
Collections.emptyMap(), tableConfig.getPayloadClass(), 
ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER);
   }
 
   /**
@@ -100,8 +111,9 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
       HoodieTableConfig tableConfig,
       Option<InstantRange> instantRangeOpt,
       Option<Predicate> filterOpt,
-      Map<StoragePath, HoodieAvroFileReader> reusableFileReaders) {
-    this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, 
reusableFileReaders, tableConfig.getPayloadClass());
+      Map<StoragePath, HoodieAvroFileReader> reusableFileReaders,
+      TypedProperties props) {
+    this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, 
reusableFileReaders, tableConfig.getPayloadClass(), new HoodieConfig(props));
   }
 
   /**
@@ -110,12 +122,14 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
    * @param storageConfiguration the storage configuration to use for reading 
files
    * @param tableConfig          the configuration of the Hudi table being read
    * @param payloadClassName     the payload class for the writer
+   * @param props                the reader configurations that should be used 
when performing reads
    */
   public HoodieAvroReaderContext(
       StorageConfiguration<?> storageConfiguration,
       HoodieTableConfig tableConfig,
-      String payloadClassName) {
-    this(storageConfiguration, tableConfig, Option.empty(), Option.empty(), 
Collections.emptyMap(), payloadClassName);
+      String payloadClassName,
+      TypedProperties props) {
+    this(storageConfiguration, tableConfig, Option.empty(), Option.empty(), 
Collections.emptyMap(), payloadClassName, new HoodieConfig(props));
   }
 
   private HoodieAvroReaderContext(
@@ -124,8 +138,9 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
       Option<InstantRange> instantRangeOpt,
       Option<Predicate> filterOpt,
       Map<StoragePath, HoodieAvroFileReader> reusableFileReaders,
-      String payloadClassName) {
-    super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new 
AvroRecordContext(tableConfig, payloadClassName));
+      String payloadClassName,
+      HoodieConfig hoodieReaderConfig) {
+    super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new 
AvroRecordContext(tableConfig, payloadClassName), hoodieReaderConfig);
     this.reusableFileReaders = reusableFileReaders;
     this.isMultiFormat = tableConfig.isMultipleBaseFileFormatsEnabled();
   }
@@ -138,7 +153,7 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
     HoodieAvroFileReader reader = 
getOrCreateFileReader(storagePathInfo.getPath(), isLogFile, format -> {
       try {
         return (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage)
-            
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new 
HoodieConfig(),
+            
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieReaderConfig,
                 storagePathInfo, format, Option.empty());
       } catch (IOException e) {
         throw new HoodieIOException("Failed to create avro records iterator 
from file path " + storagePathInfo.getPath(), e);
@@ -160,7 +175,7 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
     HoodieAvroFileReader reader = getOrCreateFileReader(filePath, isLogFile, 
format -> {
       try {
         return (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage)
-            
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new 
HoodieConfig(),
+            
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieReaderConfig,
                 filePath, format, Option.empty());
       } catch (IOException e) {
         throw new HoodieIOException("Failed to create avro records iterator 
from file path " + filePath, e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index fa7a1346b1a0..702309874261 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -29,12 +29,6 @@ import javax.annotation.concurrent.Immutable;
     groupName = ConfigGroups.Names.READER,
     description = "Configurations that control file group reading.")
 public class HoodieReaderConfig extends HoodieConfig {
-  public static final ConfigProperty<Boolean> USE_NATIVE_HFILE_READER = 
ConfigProperty
-      .key("_hoodie.hfile.use.native.reader")
-      .defaultValue(true)
-      .markAdvanced()
-      .sinceVersion("1.0.0")
-      .withDocumentation("When enabled, the native HFile reader is used to 
read HFiles.  This is an internal config.");
 
   public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE 
= ConfigProperty
       .key("hoodie.compaction.lazy.block.read")
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
index 0a8eec241fce..590f76a8cc9d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.engine;
 
 import org.apache.hudi.avro.HoodieAvroReaderContext;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 
 import org.apache.avro.generic.IndexedRecord;
@@ -29,18 +30,20 @@ import org.apache.avro.generic.IndexedRecord;
 public class AvroReaderContextFactory implements 
ReaderContextFactory<IndexedRecord> {
   private final HoodieTableMetaClient metaClient;
   private final String payloadClassName;
+  private final TypedProperties props;
 
-  public AvroReaderContextFactory(HoodieTableMetaClient metaClient) {
-    this(metaClient, metaClient.getTableConfig().getPayloadClass());
+  public AvroReaderContextFactory(HoodieTableMetaClient metaClient, 
TypedProperties props) {
+    this(metaClient, metaClient.getTableConfig().getPayloadClass(), props);
   }
 
-  public AvroReaderContextFactory(HoodieTableMetaClient metaClient, String 
payloadClassName) {
+  public AvroReaderContextFactory(HoodieTableMetaClient metaClient, String 
payloadClassName, TypedProperties props) {
     this.metaClient = metaClient;
     this.payloadClassName = payloadClassName;
+    this.props = props;
   }
 
   @Override
   public HoodieReaderContext<IndexedRecord> getContext() {
-    return new HoodieAvroReaderContext(metaClient.getStorageConf(), 
metaClient.getTableConfig(), payloadClassName);
+    return new HoodieAvroReaderContext(metaClient.getStorageConf(), 
metaClient.getTableConfig(), payloadClassName, props);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index f288baae2c97..2f929b5f106e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -147,7 +147,7 @@ public abstract class HoodieEngineContext {
                                                                  
TypedProperties properties) {
     if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
       String payloadClass = ConfigUtils.getPayloadClass(properties);
-      return new AvroReaderContextFactory(metaClient, payloadClass);
+      return new AvroReaderContextFactory(metaClient, payloadClass, 
properties);
     }
     return getEngineReaderContextFactory(metaClient);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 469225cf3c87..9abf88875f1d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -195,7 +195,7 @@ public final class HoodieLocalEngineContext extends 
HoodieEngineContext {
 
   @Override
   public ReaderContextFactory<IndexedRecord> 
getEngineReaderContextFactory(HoodieTableMetaClient metaClient) {
-    return new AvroReaderContextFactory(metaClient);
+    return new AvroReaderContextFactory(metaClient, new TypedProperties());
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index c64e7fbee7a7..fb6fcf612999 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.engine;
 
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -94,18 +95,29 @@ public abstract class HoodieReaderContext<T> {
   private FileGroupReaderSchemaHandler<T> schemaHandler = null;
   // the default iterator mode is engine-specific record mode
   private IteratorMode iteratorMode = IteratorMode.ENGINE_RECORD;
+  protected final HoodieConfig hoodieReaderConfig;
+
+  protected HoodieReaderContext(StorageConfiguration<?> storageConfiguration,
+      HoodieTableConfig tableConfig,
+      Option<InstantRange> instantRangeOpt,
+      Option<Predicate> keyFilterOpt,
+      RecordContext<T> recordContext) {
+    this(storageConfiguration, tableConfig, instantRangeOpt, keyFilterOpt, 
recordContext, ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER);
+  }
 
   protected HoodieReaderContext(StorageConfiguration<?> storageConfiguration,
                                 HoodieTableConfig tableConfig,
                                 Option<InstantRange> instantRangeOpt,
                                 Option<Predicate> keyFilterOpt,
-                                RecordContext<T> recordContext) {
+                                RecordContext<T> recordContext,
+                                HoodieConfig hoodieReaderConfig) {
     this.tableConfig = tableConfig;
     this.storageConfiguration = storageConfiguration;
     this.baseFileFormat = tableConfig.getBaseFileFormat();
     this.instantRangeOpt = instantRangeOpt;
     this.keyFilterOpt = keyFilterOpt;
     this.recordContext = recordContext;
+    this.hoodieReaderConfig = hoodieReaderConfig;
   }
 
   // Getter and Setter for schemaHandler
@@ -213,6 +225,10 @@ public abstract class HoodieReaderContext<T> {
     return recordContext;
   }
 
+  public HoodieConfig getHoodieReaderConfig() {
+    return hoodieReaderConfig;
+  }
+
   /**
    * Gets the record iterator based on the type of engine-specific record 
representation from the
    * file.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 5202f68e0279..72aabb410526 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.table.log;
 
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -198,9 +197,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
             String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
         return new HoodieHFileDataBlock(
             () -> getDataInputStream(storage, this.logFile, bufferSize), 
content, true, logBlockContentLoc,
-            Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath(),
-            
storage.getConf().getBoolean(HoodieReaderConfig.USE_NATIVE_HFILE_READER.key(),
-                HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue()));
+            Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath());
 
       case PARQUET_DATA_BLOCK:
         checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 6480ce83eff5..0ee914c32dc5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,12 +18,11 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
@@ -64,7 +63,6 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
   // This path is used for constructing HFile reader context, which should not 
be
   // interpreted as the actual file path for the HFile data blocks
   private final StoragePath pathForReader;
-  private final HoodieConfig hFileReaderConfig;
 
   public HoodieHFileDataBlock(Supplier<SeekableDataInputStream> 
inputStreamSupplier,
                               Option<byte[]> content,
@@ -74,24 +72,20 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
                               Map<HeaderMetadataType, String> header,
                               Map<FooterMetadataType, String> footer,
                               boolean enablePointLookups,
-                              StoragePath pathForReader,
-                              boolean useNativeHFileReader) {
+                              StoragePath pathForReader) {
     super(content, inputStreamSupplier, readBlockLazily, 
Option.of(logBlockContentLocation), readerSchema,
         header, footer, HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME, 
enablePointLookups);
     this.compressionCodec = Option.empty();
     this.pathForReader = pathForReader;
-    this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
   }
 
   public HoodieHFileDataBlock(List<HoodieRecord> records,
                               Map<HeaderMetadataType, String> header,
                               String compressionCodec,
-                              StoragePath pathForReader,
-                              boolean useNativeHFileReader) {
+                              StoragePath pathForReader) {
     super(records, header, new HashMap<>(), 
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
     this.compressionCodec = Option.of(compressionCodec);
     this.pathForReader = pathForReader;
-    this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
   }
 
   @Override
@@ -118,7 +112,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     // Read the content
     try (HoodieFileReader reader = HoodieIOFactory.getIOFactory(inlineStorage)
         .getReaderFactory(HoodieRecordType.AVRO)
-        .getContentReader(hFileReaderConfig, pathForReader, 
HoodieFileFormat.HFILE,
+        .getContentReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, 
pathForReader, HoodieFileFormat.HFILE,
             inlineStorage, content, Option.of(getSchemaFromHeader()))) {
       return unsafeCast(reader.getRecordIterator(readerSchema));
     }
@@ -151,7 +145,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     // Read the content
     try (HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase)
         
HoodieIOFactory.getIOFactory(inlineStorage).getReaderFactory(HoodieRecordType.AVRO).getContentReader(
-            hFileReaderConfig, pathForReader, HoodieFileFormat.HFILE, 
inlineStorage, content,
+            readerContext.getHoodieReaderConfig(), pathForReader, 
HoodieFileFormat.HFILE, inlineStorage, content,
             Option.of(getSchemaFromHeader()))) {
       return unsafeCast(reader.getIndexedRecordIterator(readerSchema, 
readerSchema));
     }
@@ -175,7 +169,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     try (final HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase) HoodieIOFactory
         .getIOFactory(inlineStorage)
         .getReaderFactory(HoodieRecordType.AVRO)
-        .getFileReader(hFileReaderConfig, inlinePath, HoodieFileFormat.HFILE, 
Option.of(getSchemaFromHeader()))) {
+        .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, inlinePath, 
HoodieFileFormat.HFILE, Option.of(getSchemaFromHeader()))) {
       // Get writer's schema from the header
       final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
           fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) 
: reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
@@ -183,11 +177,4 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
       return new CloseableMappingIterator<>(recordIterator, data -> 
(HoodieRecord<T>) data);
     }
   }
-
-  private HoodieConfig getHFileReaderConfig(boolean useNativeHFileReader) {
-    HoodieConfig config = new HoodieConfig();
-    config.setValue(
-        HoodieReaderConfig.USE_NATIVE_HFILE_READER, 
Boolean.toString(useNativeHFileReader));
-    return config;
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 63ff1428b708..816394a207e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -19,7 +19,10 @@
 package org.apache.hudi.common.util;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.PropertiesConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodiePayloadProps;
@@ -52,7 +55,10 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
 import static 
org.apache.hudi.keygen.constant.KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED;
 
@@ -748,8 +754,25 @@ public class ConfigUtils {
   public static HoodieConfig getReaderConfigs(StorageConfiguration<?> 
storageConf) {
     HoodieConfig config = new HoodieConfig();
     config.setAll(DEFAULT_HUDI_CONFIG_FOR_READER.getProps());
-    config.setValue(USE_NATIVE_HFILE_READER,
-        Boolean.toString(storageConf.getBoolean(USE_NATIVE_HFILE_READER.key(), 
USE_NATIVE_HFILE_READER.defaultValue())));
+    return config;
+  }
+
+  /**
+   * Apply HFile cache configurations from options to a HoodieConfig.
+   * This method extracts HFile cache-related settings from the provided 
options map
+   * and applies them to the given HoodieConfig instance.
+   *
+   * @param options Map of options containing HFile cache configurations
+   * @return HoodieConfig with HFile reader configurations
+   */
+  public static HoodieReaderConfig getHFileCacheConfigs(Map<String, String> 
options) {
+    HoodieReaderConfig config = new HoodieReaderConfig();
+    config.setValue(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED,
+        getStringWithAltKeys(options, 
HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+    config.setValue(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE,
+        getStringWithAltKeys(options, 
HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
+    config.setValue(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES,
+        getStringWithAltKeys(options, 
HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
     return config;
   }
 
@@ -789,4 +812,35 @@ public class ConfigUtils {
     }
     return mergeProperties;
   }
+
+  /**
+   * Derive necessary properties for FG reader.
+   */
+  public static TypedProperties 
buildFileGroupReaderProperties(HoodieMetadataConfig metadataConfig,
+                                                               boolean 
shouldReuse) {
+    HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder()
+        .fromProperties(metadataConfig.getProps()).build();
+    TypedProperties props = new TypedProperties();
+    props.setProperty(
+        MAX_MEMORY_FOR_MERGE.key(),
+        Long.toString(metadataConfig.getMaxReaderMemory()));
+    props.setProperty(
+        SPILLABLE_MAP_BASE_PATH.key(),
+        metadataConfig.getSplliableMapDir());
+    props.setProperty(
+        SPILLABLE_DISK_MAP_TYPE.key(),
+        commonConfig.getSpillableDiskMapType().name());
+    props.setProperty(
+        DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+        Boolean.toString(commonConfig.isBitCaskDiskMapCompressionEnabled()));
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
+        shouldReuse ? "true" : 
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
+        
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
+        
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
+        
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
+    return props;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index 912fc6a76a0c..60d9afe199e5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.Option;
@@ -134,7 +133,4 @@ public class HoodieFileReaderFactory {
     throw new UnsupportedOperationException();
   }
 
-  protected static boolean isUseNativeHFileReaderEnabled(HoodieConfig 
hoodieConfig) {
-    return 
hoodieConfig.getBooleanOrDefault(HoodieReaderConfig.USE_NATIVE_HFILE_READER);
-  }
 }
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 9fb9a6c93cb4..63c3bcf5b102 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -21,10 +21,8 @@ package org.apache.hudi.metadata;
 import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
-import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
@@ -49,6 +47,7 @@ import 
org.apache.hudi.common.table.read.buffer.FileGroupRecordBufferLoader;
 import 
org.apache.hudi.common.table.read.buffer.ReusableFileGroupRecordBufferLoader;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.HoodieDataUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -89,7 +88,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -97,10 +95,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
-import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieMetadataPayload.KEY_FIELD_NAME;
@@ -510,17 +504,16 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     Map<StoragePath, HoodieAvroFileReader> baseFileReaders = 
Collections.emptyMap();
     ReusableFileGroupRecordBufferLoader<IndexedRecord> recordBufferLoader = 
null;
     boolean shouldReuse = reuse && 
isFullScanAllowedForPartition(fileSlice.getPartitionPath());
+    TypedProperties fileGroupReaderProps = 
ConfigUtils.buildFileGroupReaderProperties(metadataConfig, shouldReuse);
     if (shouldReuse) {
       Pair<HoodieAvroFileReader, 
ReusableFileGroupRecordBufferLoader<IndexedRecord>> readers =
           reusableFileReaders.computeIfAbsent(fileSlice.getFileGroupId(), fgId 
-> {
             try {
               HoodieAvroFileReader baseFileReader = null;
               if (fileSlice.getBaseFile().isPresent()) {
-                TypedProperties props = 
TypedProperties.copy(metadataConfig.getProps());
-                setHFileBlockCacheProps(props);
-                HoodieConfig newConfig = new HoodieConfig(props);
+                HoodieConfig fileGroupReaderConfig = new 
HoodieConfig(fileGroupReaderProps);
                 baseFileReader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(storage).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-                    .getFileReader(newConfig, 
fileSlice.getBaseFile().get().getStoragePath(), 
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
+                    .getFileReader(fileGroupReaderConfig, 
fileSlice.getBaseFile().get().getStoragePath(), 
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
               }
               return Pair.of(baseFileReader, 
buildReusableRecordBufferLoader(fileSlice, latestMetadataInstantTime, 
instantRange));
             } catch (IOException ex) {
@@ -540,7 +533,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         metadataMetaClient.getTableConfig(),
         instantRange,
         Option.of(predicate),
-        baseFileReaders);
+        baseFileReaders,
+        fileGroupReaderProps);
 
     HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
         .withReaderContext(readerContext)
@@ -549,7 +543,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         .withFileSlice(fileSlice)
         .withDataSchema(SCHEMA)
         .withRequestedSchema(SCHEMA)
-        .withProps(buildFileGroupReaderProperties(metadataConfig, shouldReuse))
+        .withProps(fileGroupReaderProps)
         .withRecordBufferLoader(recordBufferLoader)
         
.withEnableOptimizedLogBlockScan(metadataConfig.isOptimizedLogBlocksScanEnabled())
         .build();
@@ -564,7 +558,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         storageConf,
         metadataMetaClient.getTableConfig(),
         instantRangeOption,
-        Option.empty());
+        Option.empty(),
+        ConfigUtils.buildFileGroupReaderProperties(metadataConfig, true));
     readerContext.initRecordMerger(metadataConfig.getProps());
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.setHasLogFiles(fileSlice.hasLogFiles());
@@ -858,41 +853,4 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return readIndexRecords(rawKeys, partitionName, Option.empty())
         .mapToPair(hoodieRecord -> 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
   }
-
-  /**
-   * Derive necessary properties for FG reader.
-   */
-  TypedProperties buildFileGroupReaderProperties(HoodieMetadataConfig 
metadataConfig, boolean shouldReuse) {
-    HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder()
-        .fromProperties(metadataConfig.getProps()).build();
-    TypedProperties props = new TypedProperties();
-    props.setProperty(
-        MAX_MEMORY_FOR_MERGE.key(),
-        Long.toString(metadataConfig.getMaxReaderMemory()));
-    props.setProperty(
-        SPILLABLE_MAP_BASE_PATH.key(),
-        metadataConfig.getSplliableMapDir());
-    props.setProperty(
-        SPILLABLE_DISK_MAP_TYPE.key(),
-        commonConfig.getSpillableDiskMapType().name());
-    props.setProperty(
-        DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-        Boolean.toString(commonConfig.isBitCaskDiskMapCompressionEnabled()));
-    if (shouldReuse) {
-      setHFileBlockCacheProps(props);
-    } else {
-      props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
-          
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
-    }
-    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
-        
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
-    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
-        
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
-    return props;
-  }
-
-  private void setHFileBlockCacheProps(Properties props) {
-    // Enable HFile block caching for resue and full scan usage
-    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(), 
"true");
-  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 739cd4bb55ea..c3b42c278778 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.testutils.reader;
 
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.LocalTaskContextSupplier;
@@ -198,8 +197,7 @@ public class HoodieFileSliceTestUtils {
             records,
             header,
             HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(),
-            pathForReader,
-            HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+            pathForReader);
       case PARQUET_DATA_BLOCK:
         return new HoodieParquetDataBlock(
             records,
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index e5f4140cdce8..55643d5b239d 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.testutils;
 
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -411,7 +410,7 @@ public class HoodieCommonTestHarness {
       case AVRO_DATA_BLOCK:
         return new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       case HFILE_DATA_BLOCK:
-        return new HoodieHFileDataBlock(records, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+        return new HoodieHFileDataBlock(records, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader);
       case PARQUET_DATA_BLOCK:
         return new HoodieParquetDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
       default:
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index c766076d66a2..7468a5868106 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.hadoop.testutils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -421,7 +420,7 @@ public class InputFormatTestUtil {
     List<HoodieRecord> hoodieRecords = 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
     if (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) {
       dataBlock = new HoodieHFileDataBlock(
-          hoodieRecords, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath(), 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+          hoodieRecords, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath());
     } else if (logBlockType == 
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
       dataBlock = new HoodieParquetDataBlock(hoodieRecords, header,
           HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index e0a02bd3b84c..2ee6c6c92fb5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -24,7 +24,6 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig, 
HoodieMetadataConfig}
-import org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
@@ -840,9 +839,8 @@ object HoodieBaseRelation extends SparkAdapterSupport {
     partitionedFile => {
       val hadoopConf = hadoopConfBroadcast.value.value
       val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(partitionedFile)
-      val hoodieConfig = new HoodieConfig()
-      hoodieConfig.setValue(USE_NATIVE_HFILE_READER,
-        options.getOrElse(USE_NATIVE_HFILE_READER.key(), 
USE_NATIVE_HFILE_READER.defaultValue().toString))
+      val hoodieConfig = ConfigUtils.getHFileCacheConfigs(options.asJava)
+
       val reader = new HoodieSparkIOFactory(
         new HoodieHadoopStorage(filePath, 
HadoopFSUtils.getStorageConf(hadoopConf)))
         .getReaderFactory(HoodieRecordType.AVRO)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 943126be87f5..3dcbce2186af 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
 import org.apache.hudi.common.data.HoodieListData
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -45,7 +45,7 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
     PARTITIONPATH_FIELD.key -> "partition",
     HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
     HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15",
-    "hoodie.hfile.block.cache.enabled" -> "true"
+    HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key() -> "200"
   ) ++ baseOpts ++ metadataOpts
 
   val secondaryIndexOpts: Map[String, String] = Map(
@@ -128,7 +128,7 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
                                              deletedDf: DataFrame = 
sparkSession.emptyDataFrame): Unit = {
     val writeConfig = getWriteConfig(hudiOpts)
     val metadata = metadataWriter(writeConfig).getTableMetadata
-    val readDf = spark.read.format("hudi").load(basePath)
+    val readDf = spark.read.options(hudiOpts).format("hudi").load(basePath)
     readDf.cache()
     val rowArr = readDf.collect()
     val recordIndexMap = 
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndexLocationsWithKeys(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index ef6e2f6a978a..a6b857952437 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -363,6 +363,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
       saveMode = SaveMode.Overwrite)
     insertDf.cache()
 
+    spark.sql(s"SET hoodie.hfile.block.cache.size = 200")
     spark.sql(s"CREATE TABLE IF NOT EXISTS hudi_indexed_table USING hudi 
OPTIONS (hoodie.metadata.enable = 'true', hoodie.metadata.record.index.enable = 
'true', hoodie.write.merge.handle.class = 
'org.apache.hudi.io.FileGroupReaderBasedMergeHandle') LOCATION '$basePath'")
     val existingKeys = dataGen.getExistingKeys
     spark.sql(s"DELETE FROM hudi_indexed_table WHERE _row_key IN 
('${existingKeys.get(0)}', '${existingKeys.get(1)}')")
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index b9f3fd5e0004..90e3cddce15c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
@@ -65,7 +64,6 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CleanerUtils;
-import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieDataUtils;
@@ -1982,12 +1980,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       StoragePath path = new StoragePath(
           FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partitionPath).toString(), filename);
       BloomFilter bloomFilter;
-      HoodieConfig hoodieConfig = new HoodieConfig();
-      hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER,
-          Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER)));
       try (HoodieFileReader fileReader = 
getHoodieSparkIOFactory(metaClient.getStorage())
           .getReaderFactory(HoodieRecordType.AVRO)
-          .getFileReader(hoodieConfig, path)) {
+          .getFileReader(new HoodieConfig(), path)) {
         bloomFilter = fileReader.readBloomFilter();
         if (bloomFilter == null) {
           LOG.error("Failed to read bloom filter for {}", path);

Reply via email to