This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bbb5feb80d [HUDI-8637] Refactor broadcast to a more generic reader 
context factory (#13148)
2bbb5feb80d is described below

commit 2bbb5feb80d30352f43c0c44dd4762c128160902
Author: Tim Brown <[email protected]>
AuthorDate: Wed Apr 16 21:43:52 2025 -0500

    [HUDI-8637] Refactor broadcast to a more generic reader context factory 
(#13148)
---
 .../apache/hudi/table/EngineBroadcastManager.java  |  58 ------------
 .../apache/hudi/table/HoodieCompactionHandler.java |   5 +-
 .../hudi/table/action/compact/HoodieCompactor.java |  25 ++---
 .../hadoop/TestHoodieFileGroupReaderOnHive.java    |   2 +-
 .../MultipleSparkJobExecutionStrategy.java         |  17 ++--
 .../client/common/HoodieSparkEngineContext.java    |   8 ++
 .../common/SparkReaderContextFactory.java}         |  97 ++++++++-----------
 ...HoodieSparkFileGroupReaderBasedMergeHandle.java |  14 +--
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |   6 +-
 .../HoodieSparkMergeOnReadTableCompactor.java      |  10 --
 .../hudi/BaseSparkInternalRowReaderContext.java    |   5 +
 .../SparkFileFormatInternalRowReaderContext.scala  |   4 +-
 .../common/TestSparkReaderContextFactory.java}     | 105 +++++++++++----------
 .../hudi/common/engine/HoodieEngineContext.java    |   5 +
 .../hudi/common/engine/HoodieReaderContext.java    |  21 +++--
 .../hudi/common/engine/ReaderContextFactory.java   |  30 ++++++
 .../table/log/HoodieMergedLogRecordReader.java     |   4 +-
 .../common/table/read/HoodieFileGroupReader.java   |   5 +-
 .../hudi/common/util/LocalAvroSchemaCache.java     |  12 +--
 .../testutils/reader/HoodieTestReaderContext.java  |   1 +
 .../hudi/hadoop/HiveHoodieReaderContext.java       |   5 +-
 .../HoodieFileGroupReaderBasedRecordReader.java    |   3 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala |   2 +-
 ...stSparkFileFormatInternalRowReaderContext.scala |   2 +-
 .../procedures/PartitionBucketIndexManager.scala   |  10 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |   4 +-
 .../TestSpark35RecordPositionMetadataColumn.scala  |   2 +-
 27 files changed, 194 insertions(+), 268 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
deleted file mode 100644
index bcee2829dd8..00000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.table;
-
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.StoragePath;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.Serializable;
-
-/**
- * Broadcast variable management for engines.
- */
-public class EngineBroadcastManager implements Serializable {
-
-  /**
-   * Prepares and broadcasts necessary information needed by compactor.
-   */
-  public void prepareAndBroadcast() {
-    // NO operation.
-  }
-
-  /**
-   * Returns the {@link HoodieReaderContext} instance needed by the file group 
reader based on
-   * the broadcast variables.
-   *
-   * @param basePath Table base path
-   */
-  public Option<HoodieReaderContext> 
retrieveFileGroupReaderContext(StoragePath basePath) {
-    return Option.empty();
-  }
-
-  /**
-   * Retrieves the broadcast configuration.
-   */
-  public Option<Configuration> retrieveStorageConfig() {
-    return Option.empty();
-  }
-}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
index e97e613bb4c..87fd81bbd4c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
@@ -28,8 +28,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
-import org.apache.hadoop.conf.Configuration;
-
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -50,8 +48,7 @@ public interface HoodieCompactionHandler<T> {
   default List<WriteStatus> compactUsingFileGroupReader(String instantTime,
                                                         CompactionOperation 
operation,
                                                         HoodieWriteConfig 
writeConfig,
-                                                        HoodieReaderContext 
readerContext,
-                                                        Configuration conf) {
+                                                        HoodieReaderContext 
readerContext) {
     throw new HoodieNotSupportedException("This engine does not support file 
group reader based compaction.");
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index bca2dfe26c5..a08bd76ac79 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -24,6 +24,8 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.CompactionOperation;
@@ -46,7 +48,6 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.EngineBroadcastManager;
 import org.apache.hudi.table.HoodieCompactionHandler;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
@@ -89,16 +90,6 @@ public abstract class HoodieCompactor<T, I, K, O> implements 
Serializable {
    */
   public abstract void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieEngineContext context, HoodieWriteConfig config, String instantTime);
 
-  /**
-   * @param context {@link HoodieEngineContext} instance
-   *
-   * @return the {@link EngineBroadcastManager} if available.
-   */
-  public Option<EngineBroadcastManager> 
getEngineBroadcastManager(HoodieEngineContext context,
-                                                                  
HoodieTableMetaClient metaClient) {
-    return Option.empty();
-  }
-
   /**
    * Execute compaction operations and report back status.
    */
@@ -152,11 +143,9 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
         && config.populateMetaFields();                                        
     // Virtual key support by fg reader is not ready
 
     if (useFileGroupReaderBasedCompaction) {
-      Option<EngineBroadcastManager> broadcastManagerOpt = 
getEngineBroadcastManager(context, metaClient);
-      // Broadcast required information.
-      
broadcastManagerOpt.ifPresent(EngineBroadcastManager::prepareAndBroadcast);
+      ReaderContextFactory<T> readerContextFactory = 
context.getReaderContextFactory(metaClient);
       return context.parallelize(operations).map(
-              operation -> compact(compactionHandler, metaClient, config, 
operation, compactionInstantTime, broadcastManagerOpt))
+              operation -> compact(compactionHandler, config, operation, 
compactionInstantTime, readerContextFactory.getContext()))
           .flatMap(List::iterator);
     } else {
       return context.parallelize(operations).map(
@@ -297,14 +286,12 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
    * Execute a single compaction operation and report back status.
    */
   public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
-                                   HoodieTableMetaClient metaClient,
                                    HoodieWriteConfig writeConfig,
                                    CompactionOperation operation,
                                    String instantTime,
-                                   Option<EngineBroadcastManager> 
broadcastManagerOpt) throws IOException {
+                                   HoodieReaderContext<T> hoodieReaderContext) 
throws IOException {
     return compactionHandler.compactUsingFileGroupReader(instantTime, 
operation,
-        writeConfig, 
broadcastManagerOpt.get().retrieveFileGroupReaderContext(metaClient.getBasePath()).get(),
-        broadcastManagerOpt.get().retrieveStorageConfig().get());
+        writeConfig, hoodieReaderContext);
   }
 
   public String getMaxInstantTime(HoodieTableMetaClient metaClient) {
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index 93848f3496a..572fd81638b 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -142,7 +142,7 @@ public class TestHoodieFileGroupReaderOnHive extends 
TestHoodieFileGroupReaderBa
     setupJobconf(jobConf);
     return new HiveHoodieReaderContext(readerCreator, 
getRecordKeyField(metaClient),
         getStoredPartitionFieldNames(new 
JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema),
-        new ObjectInspectorCache(avroSchema, jobConf));
+        new ObjectInspectorCache(avroSchema, jobConf), storageConf);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 0b922571283..5c53aa8494c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ClusteringOperation;
 import org.apache.hudi.common.model.FileSlice;
@@ -72,10 +73,8 @@ import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.SparkBroadcastManager;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 
@@ -455,8 +454,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     SerializableSchema serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields);
 
     // broadcast reader context.
-    SparkBroadcastManager broadcastManager = new 
SparkBroadcastManager(getEngineContext(), getHoodieTable().getMetaClient());
-    broadcastManager.prepareAndBroadcast();
+    ReaderContextFactory<InternalRow> readerContextFactory = 
getEngineContext().getReaderContextFactory(getHoodieTable().getMetaClient());
     StructType sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields);
 
     RDD<InternalRow> internalRowRDD = jsc.parallelize(clusteringOps, 
clusteringOps.size()).flatMap(new FlatMapFunction<ClusteringOperation, 
InternalRow>() {
@@ -469,20 +467,17 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         if (isInternalSchemaPresent) {
           internalSchemaOption = SerDeHelper.fromJson(internalSchemaStr);
         }
-        Option<HoodieReaderContext> readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(new StoragePath(basePath));
-        Configuration conf = broadcastManager.retrieveStorageConfig().get();
 
         // instantiate FG reader
-        HoodieFileGroupReader<T> fileGroupReader = new 
HoodieFileGroupReader<>(readerContextOpt.get(),
-            getHoodieTable().getMetaClient().getStorage().newInstance(new 
StoragePath(basePath), new HadoopStorageConfiguration(conf)),
+        HoodieReaderContext<InternalRow> readerContext = 
readerContextFactory.getContext();
+        HoodieFileGroupReader<InternalRow> fileGroupReader = new 
HoodieFileGroupReader<>(readerContext,
+            getHoodieTable().getMetaClient().getStorage().newInstance(new 
StoragePath(basePath), readerContext.getStorageConfiguration()),
             basePath, instantTime, fileSlice, readerSchema, readerSchema, 
internalSchemaOption,
             getHoodieTable().getMetaClient(), 
getHoodieTable().getMetaClient().getTableConfig().getProps(),
             0, Long.MAX_VALUE, usePosition, false);
         fileGroupReader.initRecordIterators();
         // read records from the FG reader
-        HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow> 
recordIterator
-            = 
(HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow>) 
fileGroupReader.getClosableIterator();
-        return recordIterator;
+        return fileGroupReader.getClosableIterator();
       }
     }).rdd();
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 1bafb29f207..cc8fd4c4ca7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -25,11 +25,13 @@ import 
org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -48,6 +50,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
 
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -250,6 +253,11 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
     return HoodieJavaRDD.getJavaRDD(data).aggregate(zeroValue, seqOpFunc, 
combOpFunc);
   }
 
+  @Override
+  public ReaderContextFactory<InternalRow> 
getReaderContextFactory(HoodieTableMetaClient metaClient) {
+    return new SparkReaderContextFactory(this, metaClient);
+  }
+
   public SparkConf getConf() {
     return javaSparkContext.getConf();
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
similarity index 62%
rename from 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
rename to 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index af7d18c14fd..d2d5d170fd4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -7,46 +7,44 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
-package org.apache.hudi.table;
+package org.apache.hudi.client.common;
 
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
-import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.FileFormat;
 import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader;
+import org.apache.spark.sql.hudi.SparkAdapter;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.util.SerializableConfiguration;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,30 +54,18 @@ import scala.Tuple2;
 import scala.collection.JavaConverters;
 
 /**
- * Broadcast variable management for Spark.
+ * Factory that provides the {@link InternalRow} based {@link 
HoodieReaderContext} for reading data into the spark native format.
  */
-public class SparkBroadcastManager extends EngineBroadcastManager {
+class SparkReaderContextFactory implements ReaderContextFactory<InternalRow> {
+  private final Broadcast<SparkParquetReader> parquetReaderBroadcast;
+  private final Broadcast<SerializableConfiguration> configurationBroadcast;
 
-  private final transient HoodieEngineContext context;
-  private final transient HoodieTableMetaClient metaClient;
-
-  protected Option<SparkParquetReader> parquetReaderOpt = Option.empty();
-  protected Broadcast<SQLConf> sqlConfBroadcast;
-  protected Broadcast<SparkParquetReader> parquetReaderBroadcast;
-  protected Broadcast<SerializableConfiguration> configurationBroadcast;
-
-  public SparkBroadcastManager(HoodieEngineContext context, 
HoodieTableMetaClient metaClient) {
-    this.context = context;
-    this.metaClient = metaClient;
+  SparkReaderContextFactory(HoodieSparkEngineContext hoodieSparkEngineContext, 
HoodieTableMetaClient metaClient) {
+    this(hoodieSparkEngineContext, metaClient, new 
TableSchemaResolver(metaClient), SparkAdapterSupport$.MODULE$.sparkAdapter());
   }
 
-  @Override
-  public void prepareAndBroadcast() {
-    if (!(context instanceof HoodieSparkEngineContext)) {
-      throw new HoodieIOException("Expected to be called using Engine's 
context and not local context");
-    }
-
-    HoodieSparkEngineContext hoodieSparkEngineContext = 
(HoodieSparkEngineContext) context;
+  SparkReaderContextFactory(HoodieSparkEngineContext hoodieSparkEngineContext, 
HoodieTableMetaClient metaClient,
+                            TableSchemaResolver resolver, SparkAdapter 
sparkAdapter) {
     SQLConf sqlConf = 
hoodieSparkEngineContext.getSqlContext().sessionState().conf();
     JavaSparkContext jsc = hoodieSparkEngineContext.jsc();
 
@@ -88,49 +74,46 @@ public class SparkBroadcastManager extends 
EngineBroadcastManager {
     scala.collection.immutable.Map<String, String> options =
         scala.collection.immutable.Map$.MODULE$.<String, String>empty()
             .$plus(new Tuple2<>(FileFormat.OPTION_RETURNING_BATCH(), 
Boolean.toString(returningBatch)));
-    TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
     InstantFileNameGenerator fileNameGenerator = 
metaClient.getTimelineLayout().getInstantFileNameGenerator();
     HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
     Map<String, String> schemaEvolutionConfigs =
         getSchemaEvolutionConfigs(resolver, timeline, fileNameGenerator, 
metaClient.getBasePath().toString());
 
     // Broadcast: SQLConf.
-    sqlConfBroadcast = jsc.broadcast(sqlConf);
     // Broadcast: Configuration.
     Configuration configs = getHadoopConfiguration(jsc.hadoopConfiguration());
-    addSchemaEvolutionConfigs(configs, schemaEvolutionConfigs);
+    schemaEvolutionConfigs.forEach(configs::set);
     configurationBroadcast = jsc.broadcast(new 
SerializableConfiguration(configs));
     // Broadcast: ParquetReader.
     // Spark parquet reader has to be instantiated on the driver and broadcast 
to the executors
-    parquetReaderOpt = 
Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader(
-        false, sqlConfBroadcast.getValue(), options, 
configurationBroadcast.getValue().value()));
-    parquetReaderBroadcast = jsc.broadcast(parquetReaderOpt.get());
+    SparkParquetReader parquetFileReader = 
sparkAdapter.createParquetFileReader(false, sqlConf, options, configs);
+    parquetReaderBroadcast = jsc.broadcast(parquetFileReader);
   }
 
   @Override
-  public Option<HoodieReaderContext> 
retrieveFileGroupReaderContext(StoragePath basePath) {
+  public HoodieReaderContext<InternalRow> getContext() {
     if (parquetReaderBroadcast == null) {
       throw new HoodieException("Spark Parquet reader broadcast is not 
initialized.");
     }
 
+    if (configurationBroadcast == null) {
+      throw new HoodieException("Configuration broadcast is not initialized.");
+    }
+
     SparkParquetReader sparkParquetReader = parquetReaderBroadcast.getValue();
     if (sparkParquetReader != null) {
-      List<Filter> filters = new ArrayList<>();
-      return Option.of(new SparkFileFormatInternalRowReaderContext(
+      List<Filter> filters = Collections.emptyList();
+      return new SparkFileFormatInternalRowReaderContext(
           sparkParquetReader,
           JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
-          JavaConverters.asScalaBufferConverter(filters).asScala().toSeq()));
+          JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
+          new 
HadoopStorageConfiguration(configurationBroadcast.getValue().value()));
     } else {
       throw new HoodieException("Cannot get the broadcast Spark Parquet 
reader.");
     }
   }
 
-  @Override
-  public Option<Configuration> retrieveStorageConfig() {
-    return Option.of(configurationBroadcast.getValue().value());
-  }
-
-  static Configuration getHadoopConfiguration(Configuration configuration) {
+  private static Configuration getHadoopConfiguration(Configuration 
configuration) {
     // new Configuration() is critical so that we don't run into 
ConcurrentModificatonException
     Configuration hadoopConf = new Configuration(configuration);
     hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), 
false);
@@ -147,10 +130,10 @@ public class SparkBroadcastManager extends 
EngineBroadcastManager {
     return (new HadoopStorageConfiguration(hadoopConf).getInline()).unwrap();
   }
 
-  static Map<String, String> getSchemaEvolutionConfigs(TableSchemaResolver 
schemaResolver,
-                                                       HoodieTimeline timeline,
-                                                       
InstantFileNameGenerator fileNameGenerator,
-                                                       String basePath) {
+  private static Map<String, String> 
getSchemaEvolutionConfigs(TableSchemaResolver schemaResolver,
+                                                               HoodieTimeline 
timeline,
+                                                               
InstantFileNameGenerator fileNameGenerator,
+                                                               String 
basePath) {
     Option<InternalSchema> internalSchemaOpt = 
schemaResolver.getTableInternalSchemaFromCommitMetadata();
     Map<String, String> configs = new HashMap<>();
     if (internalSchemaOpt.isPresent()) {
@@ -160,10 +143,4 @@ public class SparkBroadcastManager extends 
EngineBroadcastManager {
     }
     return configs;
   }
-
-  static void addSchemaEvolutionConfigs(Configuration configs, Map<String, 
String> schemaEvolutionConfigs) {
-    for (Map.Entry<String, String> entry : schemaEvolutionConfigs.entrySet()) {
-      configs.set(entry.getKey(), entry.getValue());
-    }
-  }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
index 0fa5d82727c..7a73e3c7824 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -49,12 +49,10 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
@@ -82,19 +80,17 @@ import static 
org.apache.hudi.common.config.HoodieReaderConfig.MERGE_USE_RECORD_
 public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I, K, O> {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSparkFileGroupReaderBasedMergeHandle.class);
 
-  protected HoodieReaderContext readerContext;
-  protected FileSlice fileSlice;
-  protected Configuration conf;
-  protected HoodieReadStats readStats;
+  private final HoodieReaderContext<T> readerContext;
+  private final FileSlice fileSlice;
+  private HoodieReadStats readStats;
 
   public HoodieSparkFileGroupReaderBasedMergeHandle(HoodieWriteConfig config, 
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                                     CompactionOperation 
operation, TaskContextSupplier taskContextSupplier,
                                                     Option<BaseKeyGenerator> 
keyGeneratorOpt,
-                                                    HoodieReaderContext 
readerContext, Configuration conf) {
+                                                    HoodieReaderContext<T> 
readerContext) {
     super(config, instantTime, operation.getPartitionPath(), 
operation.getFileId(), hoodieTable, taskContextSupplier);
     this.keyToNewRecords = Collections.emptyMap();
     this.readerContext = readerContext;
-    this.conf = conf;
     Option<HoodieBaseFile> baseFileOpt =
         operation.getBaseFile(config.getBasePath(), 
operation.getPartitionPath());
     List<HoodieLogFile> logFiles = 
operation.getDeltaFileNames().stream().map(p ->
@@ -189,7 +185,7 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T, 
I, K, O> extends Hood
     props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(maxMemoryPerCompaction));
     // Initializes file group reader
     try (HoodieFileGroupReader<T> fileGroupReader = new 
HoodieFileGroupReader<>(readerContext,
-        storage.newInstance(hoodieTable.getMetaClient().getBasePath(), new 
HadoopStorageConfiguration(conf)),
+        storage.newInstance(hoodieTable.getMetaClient().getBasePath(), 
readerContext.getStorageConfiguration()),
         hoodieTable.getMetaClient().getBasePath().toString(), instantTime, 
fileSlice,
         writeSchemaWithMetaFields, writeSchemaWithMetaFields, 
internalSchemaOption,
         hoodieTable.getMetaClient(), props, 0, Long.MAX_VALUE, usePosition, 
false)) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index c4ebead38aa..c5200625f56 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -80,7 +80,6 @@ import 
org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
-import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -272,12 +271,11 @@ public class HoodieSparkCopyOnWriteTable<T>
   public List<WriteStatus> compactUsingFileGroupReader(String instantTime,
                                                        CompactionOperation 
operation,
                                                        HoodieWriteConfig 
writeConfig,
-                                                       HoodieReaderContext 
readerContext,
-                                                       Configuration conf) {
+                                                       HoodieReaderContext 
readerContext) {
     config.setDefault(writeConfig);
     Option<BaseKeyGenerator> keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
     HoodieSparkFileGroupReaderBasedMergeHandle mergeHandle = new 
HoodieSparkFileGroupReaderBasedMergeHandle(config,
-        instantTime, this, operation, taskContextSupplier, keyGeneratorOpt, 
readerContext, conf);
+        instantTime, this, operation, taskContextSupplier, keyGeneratorOpt, 
readerContext);
     mergeHandle.write();
     return mergeHandle.close();
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index d6eff9cd0c1..d47eb6d33aa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -25,14 +25,10 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.EngineBroadcastManager;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.SparkBroadcastManager;
 
 import static 
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
 
@@ -45,12 +41,6 @@ import static 
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVE
 public class HoodieSparkMergeOnReadTableCompactor<T>
     extends HoodieCompactor<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
 
-  @Override
-  public Option<EngineBroadcastManager> 
getEngineBroadcastManager(HoodieEngineContext context,
-                                                                  
HoodieTableMetaClient metaClient) {
-    return Option.of(new SparkBroadcastManager(context, metaClient));
-  }
-
   @Override
   public void preCompact(
       HoodieTable table, HoodieTimeline pendingCompactionTimeline, 
WriteOperationType operationType, String instantTime) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index c39c430451c..8df5bdf57b7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.Schema;
 import org.apache.spark.sql.HoodieInternalRowUtils;
@@ -54,6 +55,10 @@ import static 
org.apache.spark.sql.HoodieInternalRowUtils.getCachedSchema;
  */
 public abstract class BaseSparkInternalRowReaderContext extends 
HoodieReaderContext<InternalRow> {
 
+  protected BaseSparkInternalRowReaderContext(StorageConfiguration<?> 
storageConfig) {
+    super(storageConfig);
+  }
+
   @Override
   public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, 
String mergeStrategyId, String mergeImplClasses) {
     // TODO(HUDI-7843):
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 7eb154ddd0a..6ed68edf40d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -61,7 +61,9 @@ import scala.collection.mutable
  */
 class SparkFileFormatInternalRowReaderContext(parquetFileReader: 
SparkParquetReader,
                                               filters: Seq[Filter],
-                                              requiredFilters: Seq[Filter]) 
extends BaseSparkInternalRowReaderContext {
+                                              requiredFilters: Seq[Filter],
+                                              storageConfiguration: 
StorageConfiguration[_])
+  extends BaseSparkInternalRowReaderContext(storageConfiguration) {
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private lazy val bootstrapSafeFilters: Seq[Filter] = 
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
   private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = 
mutable.Map()
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestSparkReaderContextFactory.java
similarity index 57%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java
rename to 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestSparkReaderContextFactory.java
index cb33a887c19..1a51ed27510 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestSparkReaderContextFactory.java
@@ -7,21 +7,22 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
-package org.apache.hudi.table;
+package org.apache.hudi.client.common;
 
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -32,64 +33,44 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.fs.inline.InLineFileSystem;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.FileFormat;
+import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader;
+import org.apache.spark.sql.hudi.SparkAdapter;
 import org.apache.spark.sql.internal.SQLConf;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+
+import scala.Tuple2;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-class TestSparkBroadcastManager {
-  @Test
-  void testGetStorageConfiguration() {
-    Configuration config = new Configuration(false);
-    Configuration createdConfig = 
SparkBroadcastManager.getHadoopConfiguration(config);
-    
assertFalse(createdConfig.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(),
 true));
-    assertFalse(createdConfig.getBoolean(SQLConf.CASE_SENSITIVE().key(), 
true));
-    
assertFalse(createdConfig.getBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), 
true));
-    
assertTrue(createdConfig.getBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), 
false));
-    
assertFalse(createdConfig.getBoolean("spark.sql.legacy.parquet.nanosAsLong", 
true));
-    if (HoodieSparkUtils.gteqSpark3_4()) {
-      
assertFalse(createdConfig.getBoolean("spark.sql.parquet.inferTimestampNTZ.enabled",
 true));
-    }
-
-    String inlineClassName = createdConfig.get("fs." + InLineFileSystem.SCHEME 
+ ".impl");
-    assertEquals(InLineFileSystem.class.getName(), inlineClassName);
-  }
-
-  @Test
-  void testExtraConfigsAdded() {
-    Map<String, String> extraConfigs = new HashMap<>();
-    extraConfigs.put("K1", "V1");
-    Configuration configs = new Configuration(false);
-    SparkBroadcastManager.addSchemaEvolutionConfigs(configs, extraConfigs);
-    assertEquals("V1", configs.get("K1"));
-  }
-
+class TestSparkReaderContextFactory extends HoodieClientTestBase {
   @Test
   void testGetSchemaEvolutionConfigurations() {
     TableSchemaResolver schemaResolver = mock(TableSchemaResolver.class);
     HoodieTimeline timeline = mock(HoodieTimeline.class);
     InstantFileNameGenerator fileNameGenerator = new 
InstantFileNameGeneratorV2();
     String basePath = "any_table_path";
+    metaClient = mock(HoodieTableMetaClient.class, RETURNS_DEEP_STUBS);
+    when(metaClient.getBasePath()).thenReturn(new StoragePath(basePath));
+    
when(metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants()).thenReturn(timeline);
+    
when(metaClient.getTimelineLayout().getInstantFileNameGenerator()).thenReturn(fileNameGenerator);
 
-    // Test when InternalSchema is empty.
-    
when(schemaResolver.getTableInternalSchemaFromCommitMetadata()).thenReturn(Option.empty());
-    Map<String, String> schemaEvolutionConfigs = 
SparkBroadcastManager.getSchemaEvolutionConfigs(
-        schemaResolver, timeline, fileNameGenerator, basePath);
-    assertTrue(schemaEvolutionConfigs.isEmpty());
-
-    // Test when InternalSchema is not empty.
     InstantGeneratorV2 instantGen = new InstantGeneratorV2();
     Types.RecordType record = Types.RecordType.get(Collections.singletonList(
         Types.Field.get(0, "col1", Types.BooleanType.get())));
@@ -103,14 +84,38 @@ class TestSparkBroadcastManager {
     InternalSchema internalSchema = new InternalSchema(record);
     
when(schemaResolver.getTableInternalSchemaFromCommitMetadata()).thenReturn(Option.of(internalSchema));
     when(timeline.getInstants()).thenReturn(instants);
-    schemaEvolutionConfigs = SparkBroadcastManager.getSchemaEvolutionConfigs(
-        schemaResolver, timeline, fileNameGenerator, basePath);
-    assertFalse(schemaEvolutionConfigs.isEmpty());
+    SparkAdapter sparkAdapter = mock(SparkAdapter.class);
+    scala.collection.immutable.Map<String, String> options =
+        scala.collection.immutable.Map$.MODULE$.<String, String>empty()
+            .$plus(new Tuple2<>(FileFormat.OPTION_RETURNING_BATCH(), 
Boolean.toString(true)));
+    ArgumentCaptor<Configuration> configurationArgumentCaptor = 
ArgumentCaptor.forClass(Configuration.class);
+    SparkParquetReader sparkParquetReader = mock(SparkParquetReader.class);
+    when(sparkAdapter.createParquetFileReader(eq(false), 
eq(context.getSqlContext().sessionState().conf()), eq(options), 
configurationArgumentCaptor.capture()))
+        .thenReturn(sparkParquetReader);
+
+    SparkReaderContextFactory sparkHoodieReaderContextFactory = new 
SparkReaderContextFactory(context, metaClient, schemaResolver, sparkAdapter);
+    HoodieReaderContext<InternalRow> readerContext = 
sparkHoodieReaderContextFactory.getContext();
+
+    Configuration createdConfig = 
readerContext.getStorageConfiguration().unwrapAs(Configuration.class);
+    assertEquals(createdConfig, configurationArgumentCaptor.getValue());
+
+    
assertFalse(createdConfig.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(),
 true));
+    assertFalse(createdConfig.getBoolean(SQLConf.CASE_SENSITIVE().key(), 
true));
+    
assertFalse(createdConfig.getBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), 
true));
+    
assertTrue(createdConfig.getBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), 
false));
+    
assertFalse(createdConfig.getBoolean("spark.sql.legacy.parquet.nanosAsLong", 
true));
+    if (HoodieSparkUtils.gteqSpark3_4()) {
+      
assertFalse(createdConfig.getBoolean("spark.sql.parquet.inferTimestampNTZ.enabled",
 true));
+    }
+
+    String inlineClassName = createdConfig.get("fs." + InLineFileSystem.SCHEME 
+ ".impl");
+    assertEquals(InLineFileSystem.class.getName(), inlineClassName);
+
     assertEquals(
         "0001_0005.deltacommit,0002_0006.deltacommit,0003_0007.commit",
-        
schemaEvolutionConfigs.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST));
+        
createdConfig.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST));
     assertEquals(
-        "any_table_path",
-        
schemaEvolutionConfigs.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH));
+        basePath,
+        createdConfig.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH));
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 5334f06fafa..6a8ecd243b1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -130,4 +131,8 @@ public abstract class HoodieEngineContext {
    * @return the result of the aggregation
    */
   public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
+
+  public <T> ReaderContextFactory<T> 
getReaderContextFactory(HoodieTableMetaClient metaClient) {
+    throw new UnsupportedOperationException("Reader context factory is not yet 
supported");
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 2751998b111..88d6ca971d3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
@@ -36,7 +37,6 @@ import org.apache.avro.generic.IndexedRecord;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,8 +57,8 @@ import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIEL
  * @param <T> The type of engine-specific record representation, e.g.,{@code 
InternalRow} in Spark
  *            and {@code RowData} in Flink.
  */
-public abstract class HoodieReaderContext<T> implements Closeable {
-
+public abstract class HoodieReaderContext<T> {
+  private final StorageConfiguration<?> storageConfiguration;
   private FileGroupReaderSchemaHandler<T> schemaHandler = null;
   private String tablePath = null;
   private String latestCommitTime = null;
@@ -71,6 +71,10 @@ public abstract class HoodieReaderContext<T> implements 
Closeable {
   // for encoding and decoding schemas to the spillable map
   private final LocalAvroSchemaCache localAvroSchemaCache = 
LocalAvroSchemaCache.getInstance();
 
+  protected HoodieReaderContext(StorageConfiguration<?> storageConfiguration) {
+    this.storageConfiguration = storageConfiguration;
+  }
+
   // Getter and Setter for schemaHandler
   public FileGroupReaderSchemaHandler<T> getSchemaHandler() {
     return schemaHandler;
@@ -143,6 +147,10 @@ public abstract class HoodieReaderContext<T> implements 
Closeable {
     this.shouldMergeUseRecordPosition = shouldMergeUseRecordPosition;
   }
 
+  public StorageConfiguration<?> getStorageConfiguration() {
+    return storageConfiguration;
+  }
+
   // These internal key names are only used in memory for record metadata and 
merging,
   // and should not be persisted to storage.
   public static final String INTERNAL_META_RECORD_KEY = "_0";
@@ -420,11 +428,4 @@ public abstract class HoodieReaderContext<T> implements 
Closeable {
   private Schema decodeAvroSchema(Object versionId) {
     return this.localAvroSchemaCache.getSchema((Integer) 
versionId).orElse(null);
   }
-
-  @Override
-  public void close() {
-    if (this.localAvroSchemaCache != null) {
-      this.localAvroSchemaCache.close();
-    }
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextFactory.java
new file mode 100644
index 00000000000..c6902e5b022
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.engine;
+
+import java.io.Serializable;
+
+/**
+ * A factory that will return a {@link HoodieReaderContext<T>} for an engine 
specific data type.
+ * The factory must be serializable by default so the factory can be 
serialized to worker nodes in a distributed compute framework like Spark.
+ * @param <T> The engine specific data type for the reader
+ */
+public interface ReaderContextFactory<T> extends Serializable {
+  HoodieReaderContext<T> getContext();
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index ab45b8c7fa0..91ef3681120 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -181,8 +181,8 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   /**
    * Returns the builder for {@code HoodieMergedLogRecordReader}.
    */
-  public static Builder newBuilder() {
-    return new Builder();
+  public static <T> Builder<T> newBuilder() {
+    return new Builder<>();
   }
 
   public long getTotalTimeTakenToReadAndMergeBlocks() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 58c04d3e733..a57fb777fbd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -290,7 +290,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
 
   private void scanLogFiles() {
     String path = readerContext.getTablePath();
-    try (HoodieMergedLogRecordReader logRecordReader = 
HoodieMergedLogRecordReader.newBuilder()
+    try (HoodieMergedLogRecordReader<T> logRecordReader = 
HoodieMergedLogRecordReader.<T>newBuilder()
         .withHoodieReaderContext(readerContext)
         .withStorage(storage)
         .withLogFiles(logFiles)
@@ -319,9 +319,6 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     if (recordBuffer != null) {
       recordBuffer.close();
     }
-    if (readerContext != null) {
-      readerContext.close();
-    }
   }
 
   public HoodieFileGroupReaderIterator<T> getClosableIterator() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
index 2286cba71b8..3d4090534f5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
@@ -19,12 +19,9 @@
 package org.apache.hudi.common.util;
 
 import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.Closeable;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,8 +31,7 @@ import java.util.Map;
  * A map of {version_id, schema} is maintained.
  */
 @NotThreadSafe
-public class LocalAvroSchemaCache implements Closeable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(LocalAvroSchemaCache.class);
+public class LocalAvroSchemaCache {
   private final Map<Integer, Schema> versionIdToSchema; // the mapping from 
version_id -> schema
   private final Map<Schema, Integer> schemaToVersionId; // the mapping from 
schema -> version_id
 
@@ -63,10 +59,4 @@ public class LocalAvroSchemaCache implements Closeable {
   public Option<Schema> getSchema(Integer versionId) {
     return Option.ofNullable(this.versionIdToSchema.get(versionId));
   }
-
-  @Override
-  public void close() {
-    this.schemaToVersionId.clear();
-    this.versionIdToSchema.clear();
-  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index 9f97bccece6..1c536e1b7a7 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -59,6 +59,7 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
   public HoodieTestReaderContext(
       Option<HoodieRecordMerger> customMerger,
       Option<String> payloadClass) {
+    super(null);
     this.customMerger = customMerger;
     this.payloadClass = payloadClass;
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index b0eaf6cb25c..b37c7d910dd 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -37,6 +37,7 @@ import 
org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
@@ -95,7 +96,9 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
   protected 
HiveHoodieReaderContext(HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator
 readerCreator,
                                     String recordKeyField,
                                     List<String> partitionCols,
-                                    ObjectInspectorCache objectInspectorCache) 
{
+                                    ObjectInspectorCache objectInspectorCache,
+                                    StorageConfiguration<?> 
storageConfiguration) {
+    super(storageConfiguration);
     this.readerCreator = readerCreator;
     this.partitionCols = partitionCols;
     this.partitionColSet = new HashSet<>(this.partitionCols);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index 9100ffad49d..39deb6a0971 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -38,6 +38,7 @@ import org.apache.hudi.hadoop.realtime.RealtimeSplit;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
@@ -124,7 +125,7 @@ public class HoodieFileGroupReaderBasedRecordReader 
implements RecordReader<Null
     Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
     this.readerContext = new HiveHoodieReaderContext(readerCreator, 
getRecordKeyField(metaClient),
         getStoredPartitionFieldNames(jobConfCopy, tableSchema),
-        new ObjectInspectorCache(tableSchema, jobConfCopy));
+        new ObjectInspectorCache(tableSchema, jobConfCopy), new 
HadoopStorageConfiguration(jobConfCopy));
     this.arrayWritable = new ArrayWritable(Writable.class, new 
Writable[requestedSchema.getFields().size()]);
     TypedProperties props = metaClient.getTableConfig().getProps();
     jobConf.forEach(e -> {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index a5afcf42564..3bceb6eb4b8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -170,7 +170,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
             .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
           fileSliceMapping.getSlice(filegroupName) match {
             case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || 
fileSlice.getLogFiles.findAny().isPresent) =>
-              val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters, 
requiredFilters)
+              val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters, 
requiredFilters, storageConf)
               val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
                 .builder().setConf(storageConf).setBasePath(tablePath).build
               val props = metaClient.getTableConfig.getProps
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
index fec096920aa..5c7f1816389 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
@@ -78,7 +78,7 @@ class TestSparkFileFormatInternalRowReaderContext extends 
SparkClientFunctionalT
   def testConvertValueToEngineType(): Unit = {
     val reader = Mockito.mock(classOf[SparkParquetReader])
     val stringValue = "string_value"
-    val sparkReaderContext = new 
SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
+    val sparkReaderContext = new 
SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty, 
storageConf())
     assertEquals(1, sparkReaderContext.convertValueToEngineType(1))
     assertEquals(1L, sparkReaderContext.convertValueToEngineType(1L))
     assertEquals(1.1f, sparkReaderContext.convertValueToEngineType(1.1f))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index 2d8811f9612..24c4663583e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -22,7 +22,7 @@ import 
org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, EN
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieReaderConfig, SerializableSchema}
-import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.engine.{HoodieEngineContext, 
ReaderContextFactory}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{PartitionBucketIndexHashingConfig, 
WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
@@ -35,7 +35,6 @@ import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.bucket.partition.{PartitionBucketIndexCalculator, 
PartitionBucketIndexUtils}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.table.SparkBroadcastManager
 
 import org.apache.avro.Schema
 import org.apache.spark.internal.Logging
@@ -213,9 +212,7 @@ class PartitionBucketIndexManager extends BaseProcedure
           throw new HoodieException("Failed to get table schema during 
clustering", e)
       }
 
-      // broadcast reader context.
-      val broadcastManager = new SparkBroadcastManager(context, metaClient)
-      broadcastManager.prepareAndBroadcast()
+      val readerContextFactory: ReaderContextFactory[InternalRow] = 
context.getReaderContextFactory(metaClient)
       val sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields)
 
       val res: RDD[InternalRow] = if (allFileSlice.isEmpty) {
@@ -227,10 +224,9 @@ class PartitionBucketIndexManager extends BaseProcedure
         spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
           // instantiate other supporting cast
           val readerSchema = serializableTableSchemaWithMetaFields.get
-          val readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(basePath)
           val internalSchemaOption: Option[InternalSchema] = Option.empty()
           // instantiate FG reader
-          val fileGroupReader = new 
HoodieFileGroupReader(readerContextOpt.get(),
+          val fileGroupReader = new 
HoodieFileGroupReader(readerContextFactory.getContext,
             metaClient.getStorage,
             basePath.toString,
             latestInstantTime.requestedTime(),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index b9f71c687d2..a5290bcab0f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -99,7 +99,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
 
   override def getHoodieReaderContext(tablePath: String, avroSchema: Schema, 
storageConf: StorageConfiguration[_]): HoodieReaderContext[InternalRow] = {
     val reader = sparkAdapter.createParquetFileReader(vectorized = false, 
spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration]))
-    new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
+    new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty, 
getStorageConf)
   }
 
   override def commitToTable(recordList: util.List[HoodieRecord[_]], 
operation: String, options: util.Map[String, String]): Unit = {
@@ -149,7 +149,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
   @Test
   def testGetOrderingValue(): Unit = {
     val reader = Mockito.mock(classOf[SparkParquetReader])
-    val sparkReaderContext = new 
SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
+    val sparkReaderContext = new 
SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty, 
getStorageConf)
     val orderingFieldName = "col2"
     val avroSchema = new Schema.Parser().parse(
       "{\"type\": \"record\",\"name\": \"test\",\"namespace\": 
\"org.apache.hudi\",\"fields\": ["
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
index 9e9dd995156..e650f8fd9b3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -92,7 +92,7 @@ class TestSpark35RecordPositionMetadataColumn extends 
SparkClientFunctionalTestH
     assertFalse(allBaseFiles.isEmpty)
 
     val requiredSchema = 
SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(dataSchema,
-      new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, 
Seq.empty).supportsParquetRowIndex)
+      new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, 
Seq.empty, storageConf()).supportsParquetRowIndex)
 
     // Confirm if the schema is as expected.
     if (HoodieSparkUtils.gteqSpark3_5) {

Reply via email to