This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ea4f14c2851 [HUDI-7744] Introduce IOFactory and a config to set the 
factory (#11192)
ea4f14c2851 is described below

commit ea4f14c28515ea9a60e9fe74fe18ef0cb497a1b0
Author: Jon Vexler <[email protected]>
AuthorDate: Mon May 13 13:37:09 2024 -0700

    [HUDI-7744] Introduce IOFactory and a config to set the factory (#11192)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/client/timeline/LSMTimelineWriter.java    |  5 +-
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  5 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  5 +-
 .../java/org/apache/hudi/io/HoodieReadHandle.java  |  6 +-
 .../table/action/commit/HoodieMergeHelper.java     |  9 +--
 .../GenericRecordValidationTestUtils.java          |  7 ++-
 .../run/strategy/JavaExecutionStrategy.java        |  6 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |  6 +-
 .../MultipleSparkJobExecutionStrategy.java         |  8 +--
 .../strategy/SingleSparkJobExecutionStrategy.java  |  5 +-
 .../hudi/io/storage/HoodieSparkIOFactory.java      | 49 ++++++++++++++++
 .../bootstrap/ParquetBootstrapMetadataHandler.java |  4 +-
 .../functional/TestHoodieBackedMetadata.java       | 10 ++--
 .../functional/TestHoodieBackedTableMetadata.java  |  4 +-
 .../hudi/common/config/HoodieStorageConfig.java    |  8 +++
 .../table/log/block/HoodieHFileDataBlock.java      | 20 +++----
 .../table/log/block/HoodieParquetDataBlock.java    |  4 +-
 .../table/timeline/HoodieArchivedTimeline.java     |  5 +-
 .../hudi/io/storage/HoodieFileReaderFactory.java   | 27 ---------
 .../hudi/io/storage/HoodieFileWriterFactory.java   | 28 +--------
 .../apache/hudi/io/storage/HoodieIOFactory.java    | 51 ++++++++++++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  4 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 14 ++---
 .../testutils/reader/HoodieTestReaderContext.java  |  4 +-
 .../hudi/sink/clustering/ClusteringOperator.java   |  7 ++-
 .../org/apache/hudi/common/util/HFileUtils.java    |  5 +-
 .../hudi/io/storage/HoodieHadoopIOFactory.java     | 68 ++++++++++++++++++++++
 .../io/hadoop/TestHoodieAvroFileReaderFactory.java |  8 ++-
 .../hudi/io/hadoop/TestHoodieOrcReaderWriter.java  |  4 +-
 .../hudi/hadoop/HoodieHFileRecordReader.java       |  8 ++-
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  8 ++-
 .../reader/DFSHoodieDatasetInputReader.java        |  5 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  6 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  4 +-
 .../utilities/HoodieMetadataTableValidator.java    |  4 +-
 35 files changed, 283 insertions(+), 138 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
index b7ad9bd57d8..a720819ee88 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
@@ -39,9 +39,9 @@ import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
@@ -269,7 +269,8 @@ public class LSMTimelineWriter {
     try (HoodieFileWriter writer = openWriter(new 
StoragePath(metaClient.getArchivePath(), compactedFileName))) {
       for (String fileName : candidateFiles) {
         // Read the input source file
-        try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+        try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) 
HoodieIOFactory.getIOFactory(metaClient.getStorageConf())
+            .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
             .getFileReader(config, metaClient.getStorageConf(), new 
StoragePath(metaClient.getArchivePath(), fileName))) {
           // Read the meta entry
           try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), 
HoodieLSMTimelineInstant.getClassSchema())) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index afde21e64ca..49e6a49f5c0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -45,7 +45,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.io.HoodieMergedReadHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -183,7 +183,8 @@ public class HoodieIndexUtils {
                                                             
StorageConfiguration<?> configuration) throws HoodieIndexException {
     checkArgument(FSUtils.isBaseFile(filePath));
     List<Pair<String, Long>> foundRecordKeys = new ArrayList<>();
-    try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    try (HoodieFileReader fileReader = 
HoodieIOFactory.getIOFactory(configuration)
+        .getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, configuration, 
filePath)) {
       // Load all rowKeys from the file, to double-confirm
       if (!candidateRecordKeys.isEmpty()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 7eb0a5fb52f..f04a015e423 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -44,9 +44,9 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -470,7 +470,8 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     }
 
     long oldNumWrites = 0;
-    try (HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(this.recordMerger.getRecordType())
+    try (HoodieFileReader reader = 
HoodieIOFactory.getIOFactory(storage.getConf())
+        .getReaderFactory(this.recordMerger.getRecordType())
         .getFileReader(config, hoodieTable.getStorageConf(), oldFilePath)) {
       oldNumWrites = reader.getTotalRecords();
     } catch (IOException e) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index 5f9afc1bad1..01678b68e96 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.table.HoodieTable;
 
@@ -69,12 +69,12 @@ public abstract class HoodieReadHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
   }
 
   protected HoodieFileReader createNewFileReader() throws IOException {
-    return 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
+    return 
HoodieIOFactory.getIOFactory(storage.getConf()).getReaderFactory(this.config.getRecordMerger().getRecordType())
         .getFileReader(config, hoodieTable.getStorageConf(), 
getLatestBaseFile().getStoragePath());
   }
 
   protected HoodieFileReader createNewFileReader(HoodieBaseFile 
hoodieBaseFile) throws IOException {
-    return 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
+    return 
HoodieIOFactory.getIOFactory(storage.getConf()).getReaderFactory(this.config.getRecordMerger().getRecordType())
         .getFileReader(config, hoodieTable.getStorageConf(), 
hoodieBaseFile.getStoragePath());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 38383fd7a88..a13253bc1b0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -37,7 +37,7 @@ import 
org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -80,7 +80,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
 
     StorageConfiguration<?> storageConf = table.getStorageConf().newInstance();
     HoodieRecord.HoodieRecordType recordType = 
table.getConfig().getRecordMerger().getRecordType();
-    HoodieFileReader baseFileReader = HoodieFileReaderFactory
+    HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(storageConf)
         .getReaderFactory(recordType)
         .getFileReader(writeConfig, storageConf, mergeHandle.getOldFilePath());
     HoodieFileReader bootstrapFileReader = null;
@@ -112,9 +112,10 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
       if (baseFile.getBootstrapBaseFile().isPresent()) {
         StoragePath bootstrapFilePath = 
baseFile.getBootstrapBaseFile().get().getStoragePath();
         StorageConfiguration<?> bootstrapFileConfig = 
table.getStorageConf().newInstance();
-        bootstrapFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
+        bootstrapFileReader = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(recordType).newBootstrapFileReader(
             baseFileReader,
-            
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(writeConfig, 
bootstrapFileConfig, bootstrapFilePath),
+            
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(recordType)
+                .getFileReader(writeConfig, bootstrapFileConfig, 
bootstrapFilePath),
             mergeHandle.getPartitionFields(),
             mergeHandle.getPartitionValues());
         recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index 9a51c6204b3..aba943d6d5a 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -30,7 +30,7 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -145,9 +145,10 @@ public class GenericRecordValidationTestUtils {
   public static Stream<GenericRecord> readHFile(Configuration conf, String[] 
paths) {
     List<GenericRecord> valuesAsList = new LinkedList<>();
     for (String path : paths) {
+      StorageConfiguration storageConf = HadoopFSUtils.getStorageConf(conf);
       try (HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase)
-          
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-              .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, 
HadoopFSUtils.getStorageConf(conf), new StoragePath(path), 
HoodieFileFormat.HFILE)) {
+          
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+              .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, new 
StoragePath(path), HoodieFileFormat.HFILE)) {
         
valuesAsList.addAll(HoodieAvroHFileReaderImplBase.readAllRecords(reader)
             .stream().map(e -> (GenericRecord) 
e).collect(Collectors.toList()));
       } catch (IOException e) {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 02021dcc405..5b216807932 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -43,7 +43,7 @@ import 
org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFac
 import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
@@ -192,7 +192,7 @@ public abstract class JavaExecutionStrategy<T>
 
         baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
             ? Option.empty()
-            : Option.of(HoodieFileReaderFactory.getReaderFactory(recordType)
+            : 
Option.of(HoodieIOFactory.getIOFactory(table.getStorageConf()).getReaderFactory(recordType)
             .getFileReader(config, table.getStorageConf(), new 
StoragePath(clusteringOp.getDataFilePath())));
         HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
         Iterator<HoodieRecord<T>> fileSliceReader = new 
HoodieFileSliceReader(baseFileReader, scanner, readerSchema, 
tableConfig.getPreCombineField(), writeConfig.getRecordMerger(),
@@ -221,7 +221,7 @@ public abstract class JavaExecutionStrategy<T>
   private List<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
     List<HoodieRecord<T>> records = new ArrayList<>();
     clusteringOps.forEach(clusteringOp -> {
-      try (HoodieFileReader baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
+      try (HoodieFileReader baseFileReader = 
HoodieIOFactory.getIOFactory(getHoodieTable().getStorageConf()).getReaderFactory(recordType)
           .getFileReader(getHoodieTable().getConfig(), 
getHoodieTable().getStorageConf(), new 
StoragePath(clusteringOp.getDataFilePath()))) {
         Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
         Iterator<HoodieRecord> recordIterator = 
baseFileReader.getRecordIterator(readerSchema);
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 2a5b6e33171..91d9566fa60 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -86,7 +86,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
@@ -549,7 +549,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     List<FileSlice> fileSlices = 
table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
     HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
     HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+        
HoodieIOFactory.getIOFactory(context.getStorageConf()).getReaderFactory(HoodieRecordType.AVRO).getFileReader(
             writeConfig, context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
@@ -983,7 +983,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
 
     HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+        
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO).getFileReader(
             table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 739abe319b9..2331db1ff42 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -56,7 +56,6 @@ import 
org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
 import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -94,6 +93,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals;
 import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
 import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 
 /**
  * Clustering strategy to submit multiple spark jobs and union the results.
@@ -385,7 +385,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
 
   private HoodieFileReader 
getBaseOrBootstrapFileReader(StorageConfiguration<?> storageConf, String 
bootstrapBasePath, Option<String[]> partitionFields, ClusteringOperation 
clusteringOp)
       throws IOException {
-    HoodieFileReader baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
+    HoodieFileReader baseFileReader = 
getHoodieSparkIOFactory().getReaderFactory(recordType)
         .getFileReader(writeConfig, storageConf, new 
StoragePath(clusteringOp.getDataFilePath()));
     // handle bootstrap path
     if (StringUtils.nonEmpty(clusteringOp.getBootstrapFilePath()) && 
StringUtils.nonEmpty(bootstrapBasePath)) {
@@ -397,9 +397,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         partitionValues = getPartitionFieldVals(partitionFields, 
partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(),
             storageConf.unwrapAs(Configuration.class));
       }
-      baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
+      baseFileReader = 
getHoodieSparkIOFactory().getReaderFactory(recordType).newBootstrapFileReader(
           baseFileReader,
-          HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(
+          getHoodieSparkIOFactory().getReaderFactory(recordType).getFileReader(
               writeConfig, storageConf, new StoragePath(bootstrapFilePath)), 
partitionFields,
           partitionValues);
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 50eb9d4bd7a..06ba64dad89 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -39,7 +39,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.storage.StoragePath;
@@ -64,6 +63,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
+
 /**
  * Clustering strategy to submit single spark jobs.
  * MultipleSparkJobExecution strategy is not ideal for use cases that require 
large number of clustering groups
@@ -146,7 +147,7 @@ public abstract class SingleSparkJobExecutionStrategy<T>
       Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
       Iterable<HoodieRecord<T>> indexedRecords = () -> {
         try {
-          HoodieFileReader baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
+          HoodieFileReader baseFileReader = 
getHoodieSparkIOFactory().getReaderFactory(recordType)
               .getFileReader(writeConfig, getHoodieTable().getStorageConf(), 
new StoragePath(clusteringOp.getDataFilePath()));
           Option<BaseKeyGenerator> keyGeneratorOp = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
           // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
new file mode 100644
index 00000000000..16431d61551
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+/**
+ * Creates readers and writers for SPARK and AVRO record payloads
+ */
+public class HoodieSparkIOFactory extends HoodieHadoopIOFactory {
+  private static final HoodieSparkIOFactory HOODIE_SPARK_IO_FACTORY = new 
HoodieSparkIOFactory();
+
+  public static HoodieSparkIOFactory getHoodieSparkIOFactory() {
+    return HOODIE_SPARK_IO_FACTORY;
+  }
+
+  @Override
+  public HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
+    if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
+      return new HoodieSparkFileReaderFactory();
+    }
+    return super.getReaderFactory(recordType);
+  }
+
+  @Override
+  public HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
+    if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
+      return new HoodieSparkFileWriterFactory();
+    }
+    return super.getWriterFactory(recordType);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 151e88432e3..adc6a456ac9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -31,7 +31,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieBootstrapHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -58,6 +57,7 @@ import java.io.IOException;
 import java.util.function.Function;
 
 import static 
org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 
 class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
 
@@ -82,7 +82,7 @@ class ParquetBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
                                   Schema schema) throws Exception {
     HoodieRecord.HoodieRecordType recordType = 
table.getConfig().getRecordMerger().getRecordType();
 
-    HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
+    HoodieFileReader reader = 
getHoodieSparkIOFactory().getReaderFactory(recordType)
         .getFileReader(table.getConfig(), table.getStorageConf(), 
sourceFilePath);
 
     HoodieExecutor<Void> executor = null;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 2c145f5b10e..7c84efe6bf6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -90,7 +90,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
@@ -172,6 +171,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTI
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime;
 import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
@@ -826,7 +826,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     List<FileSlice> fileSlices = 
table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
     HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
     HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+        
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO).getFileReader(
             table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
@@ -1449,9 +1449,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
     final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
 
-    HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
-            table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
+    HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase) getHoodieSparkIOFactory()
+        .getReaderFactory(HoodieRecordType.AVRO)
+        .getFileReader(table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
       if (enableMetaFields) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 9e8521d669b..f711636d514 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -46,7 +46,6 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
@@ -87,6 +86,7 @@ import static 
org.apache.hudi.common.model.WriteOperationType.COMPACT;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -527,7 +527,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
 
     HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+        
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO).getFileReader(
             table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index f3ad183def4..0309aee00a9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -235,6 +235,14 @@ public class HoodieStorageConfig extends HoodieConfig {
           + "and it is loaded at runtime. This is only required when trying to 
"
           + "override the existing write context when 
`hoodie.datasource.write.row.writer.enable=true`.");
 
+  public static final ConfigProperty<String> HOODIE_IO_FACTORY_CLASS = 
ConfigProperty
+      .key("hoodie.io.factory.class")
+      .defaultValue("org.apache.hudi.io.storage.HoodieHadoopIOFactory")
+      .markAdvanced()
+      .sinceVersion("0.15.0")
+      .withDocumentation("The fully-qualified class name of the factory class 
to return readers and writers of files used "
+          + "by Hudi. The provided class should implement 
`org.apache.hudi.io.storage.HoodieIOFactory`.");
+
 
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index e63a1f9872a..3d6e2a81b0b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -33,7 +33,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -192,11 +192,10 @@ public class HoodieHFileDataBlock extends HoodieDataBlock 
{
     StorageConfiguration<?> storageConf = 
getBlockContentLocation().get().getStorageConf().getInline();
     HoodieStorage storage = HoodieStorageUtils.getStorage(pathForReader, 
storageConf);
     // Read the content
-    try (HoodieFileReader reader =
-             
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getContentReader(
-
-                 hFileReaderConfig, storageConf, pathForReader, 
HoodieFileFormat.HFILE, storage, content,
-                 Option.of(getSchemaFromHeader()))) {
+    try (HoodieFileReader reader = HoodieIOFactory.getIOFactory(storageConf)
+        .getReaderFactory(HoodieRecordType.AVRO)
+        .getContentReader(hFileReaderConfig, storageConf, pathForReader,
+            HoodieFileFormat.HFILE, storage, content, 
Option.of(getSchemaFromHeader()))) {
       return unsafeCast(reader.getRecordIterator(readerSchema));
     }
   }
@@ -209,7 +208,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     HoodieStorage storage = HoodieStorageUtils.getStorage(pathForReader, 
storageConf);
     // Read the content
     try (HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getContentReader(
+        
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO).getContentReader(
             hFileReaderConfig, storageConf, pathForReader, 
HoodieFileFormat.HFILE, storage, content,
             Option.of(getSchemaFromHeader()))) {
       return unsafeCast(reader.getIndexedRecordIterator(readerSchema, 
readerSchema));
@@ -231,10 +230,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
         blockContentLoc.getContentPositionInLogFile(),
         blockContentLoc.getBlockSize());
 
-    try (final HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
-            hFileReaderConfig, inlineConf, inlinePath, HoodieFileFormat.HFILE,
-            Option.of(getSchemaFromHeader()))) {
+    try (final HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase) HoodieIOFactory.getIOFactory(inlineConf)
+        .getReaderFactory(HoodieRecordType.AVRO)
+        .getFileReader(hFileReaderConfig, inlineConf, inlinePath, 
HoodieFileFormat.HFILE, Option.of(getSchemaFromHeader()))) {
       // Get writer's schema from the header
       final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
           fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) 
: reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 2997390dc34..c29b396b6aa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -26,9 +26,9 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.io.SeekableDataInputStream;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.inline.InLineFSUtils;
@@ -145,7 +145,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock 
{
 
     Schema writerSchema = new 
Schema.Parser().parse(this.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
 
-    ClosableIterator<HoodieRecord<T>> iterator = 
HoodieFileReaderFactory.getReaderFactory(type)
+    ClosableIterator<HoodieRecord<T>> iterator = 
HoodieIOFactory.getIOFactory(inlineConf).getReaderFactory(type)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, inlineConf, 
inlineLogFilePath, PARQUET, Option.empty())
         .getRecordIterator(writerSchema, readerSchema);
     return iterator;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 42f8a6a2753..28767c1047a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
@@ -266,7 +266,8 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
           .filter(fileName -> filter == null || 
LSMTimeline.isFileInRange(filter, fileName))
           .parallel().forEach(fileName -> {
             // Read the archived file
-            try (HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+            try (HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(metaClient.getStorageConf())
+                .getReaderFactory(HoodieRecordType.AVRO)
                 .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, 
metaClient.getStorageConf(), new StoragePath(metaClient.getArchivePath(), 
fileName))) {
               try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), 
readSchema)) {
                 while (iterator.hasNext()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index c285f04a2b2..8637c468fdd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -22,10 +22,7 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -43,30 +40,6 @@ import static 
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
  */
 public class HoodieFileReaderFactory {
 
-  public static HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
-    switch (recordType) {
-      case AVRO:
-
-        try {
-          Class<?> clazz =
-              
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory");
-          return (HoodieFileReaderFactory) clazz.newInstance();
-        } catch (IllegalArgumentException | IllegalAccessException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create 
HoodieAvroFileReaderFactory", e);
-        }
-      case SPARK:
-        try {
-          Class<?> clazz =
-              
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
-          return (HoodieFileReaderFactory) clazz.newInstance();
-        } catch (IllegalArgumentException | IllegalAccessException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create 
HoodieSparkFileReaderFactory", e);
-        }
-      default:
-        throw new UnsupportedOperationException(recordType + " record type not 
supported yet.");
-    }
-  }
-
   public HoodieFileReader getFileReader(HoodieConfig hoodieConfig, 
StorageConfiguration<?> conf, StoragePath path) throws IOException {
     final String extension = FSUtils.getFileExtension(path.toString());
     if (PARQUET.getFileExtension().equals(extension)) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 69a8924f508..2218af0d426 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -25,10 +25,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -43,39 +40,18 @@ import static 
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 
 public class HoodieFileWriterFactory {
 
-  private static HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
-    switch (recordType) {
-      case AVRO:
-        try {
-          Class<?> clazz = 
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory");
-          return (HoodieFileWriterFactory) clazz.newInstance();
-        } catch (IllegalAccessException | IllegalArgumentException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create 
HoodieAvroFileWriterFactory", e);
-        }
-      case SPARK:
-        try {
-          Class<?> clazz = 
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
-          return (HoodieFileWriterFactory) clazz.newInstance();
-        } catch (IllegalAccessException | IllegalArgumentException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create 
HoodieSparkFileWriterFactory", e);
-        }
-      default:
-        throw new UnsupportedOperationException(recordType + " record type not 
supported yet.");
-    }
-  }
-
   public static <T, I, K, O> HoodieFileWriter getFileWriter(
       String instantTime, StoragePath path, StorageConfiguration<?> conf, 
HoodieConfig config, Schema schema,
       TaskContextSupplier taskContextSupplier, HoodieRecordType recordType) 
throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
-    HoodieFileWriterFactory factory = getWriterFactory(recordType);
+    HoodieFileWriterFactory factory = 
HoodieIOFactory.getIOFactory(conf).getWriterFactory(recordType);
     return factory.getFileWriterByFormat(extension, instantTime, path, conf, 
config, schema, taskContextSupplier);
   }
 
   public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat 
format, OutputStream outputStream,
                                                             
StorageConfiguration<?> conf, HoodieConfig config, Schema schema, 
HoodieRecordType recordType)
       throws IOException {
-    HoodieFileWriterFactory factory = getWriterFactory(recordType);
+    HoodieFileWriterFactory factory = 
HoodieIOFactory.getIOFactory(conf).getWriterFactory(recordType);
     return factory.getFileWriterByFormat(format, outputStream, conf, config, 
schema);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
new file mode 100644
index 00000000000..3e715366134
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+/**
+ * Base class to get HoodieFileReaderFactory and HoodieFileWriterFactory
+ */
+public abstract class HoodieIOFactory {
+
+  public static HoodieIOFactory getIOFactory(StorageConfiguration<?> 
storageConf) {
+    String ioFactoryClass = 
storageConf.getString(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.key())
+        .orElse(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.defaultValue());
+    return getIOFactory(ioFactoryClass);
+  }
+
+  private static HoodieIOFactory getIOFactory(String ioFactoryClass) {
+    try {
+      return ReflectionUtils.loadClass(ioFactoryClass);
+    } catch (Exception e) {
+      throw new HoodieException("Unable to create " + ioFactoryClass, e);
+    }
+  }
+
+  public abstract HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType);
+
+  public abstract HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType);
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 9a525a8142c..c4510a3edac 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -50,7 +50,7 @@ import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.expression.BindVisitor;
 import org.apache.hudi.expression.Expression;
 import org.apache.hudi.internal.schema.Types;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.io.storage.HoodieSeekingFileReader;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.Transient;
@@ -606,7 +606,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     Option<HoodieBaseFile> basefile = slice.getBaseFile();
     if (basefile.isPresent()) {
       StoragePath baseFilePath = basefile.get().getStoragePath();
-      baseFileReader = (HoodieSeekingFileReader<?>) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      baseFileReader = (HoodieSeekingFileReader<?>) 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), 
baseFilePath);
       baseFileOpenMs = timer.endTimer();
       LOG.info(String.format("Opened metadata base file from %s at instant %s 
in %d ms", baseFilePath,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 3484fe8ae57..12dc1b33ecb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -82,7 +82,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -516,9 +516,9 @@ public class HoodieTableMetadataUtil {
       }
 
       final StoragePath writeFilePath = new 
StoragePath(dataMetaClient.getBasePathV2(), pathWithPartition);
-      try (HoodieFileReader fileReader =
-               
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
-                   hoodieConfig, dataMetaClient.getStorageConf(), 
writeFilePath)) {
+      try (HoodieFileReader fileReader = 
HoodieIOFactory.getIOFactory(dataMetaClient.getStorageConf())
+          .getReaderFactory(HoodieRecordType.AVRO).getFileReader(hoodieConfig,
+              dataMetaClient.getStorageConf(), writeFilePath)) {
         try {
           final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
           if (fileBloomFilter == null) {
@@ -961,7 +961,7 @@ public class HoodieTableMetadataUtil {
 
   private static ByteBuffer readBloomFilter(StorageConfiguration<?> conf, 
StoragePath filePath) throws IOException {
     HoodieConfig hoodieConfig = getReaderConfigs(conf);
-    try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    try (HoodieFileReader fileReader = 
HoodieIOFactory.getIOFactory(conf).getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, conf, filePath)) {
       final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
       if (fileBloomFilter == null) {
@@ -1764,7 +1764,7 @@ public class HoodieTableMetadataUtil {
 
       final String fileId = baseFile.getFileId();
       final String instantTime = baseFile.getCommitTime();
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+      HoodieFileReader reader = 
HoodieIOFactory.getIOFactory(configuration).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
           .getFileReader(config, configuration, dataFilePath);
       return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, 
partition, fileId, instantTime);
     });
@@ -1825,7 +1825,7 @@ public class HoodieTableMetadataUtil {
       final String fileId = baseFile.getFileId();
       final String instantTime = baseFile.getCommitTime();
       HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+      HoodieFileReader reader = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
           .getFileReader(hoodieConfig, storageConf, dataFilePath);
       return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, 
partition, fileId, instantTime);
     });
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index 6eb6733b04b..0266c036cf5 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -35,7 +35,7 @@ import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -81,7 +81,7 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
       Schema requiredSchema,
       StorageConfiguration<?> conf
   ) throws IOException {
-    HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieFileReaderFactory
+    HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(conf)
         
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new 
HoodieConfig(),
             conf, filePath, HoodieFileFormat.PARQUET, Option.empty());
     return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 93a2f5d45d2..3709c27a8b8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -44,6 +44,7 @@ import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metrics.FlinkClusteringMetrics;
 import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -273,7 +274,8 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
       try {
         Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
             ? Option.empty()
-            : 
Option.of(HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
+            : Option.of(HoodieIOFactory.getIOFactory(table.getStorageConf())
+            
.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
             .getFileReader(table.getConfig(), table.getStorageConf(), new 
StoragePath(clusteringOp.getDataFilePath())));
         HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
             .withStorage(table.getMetaClient().getStorage())
@@ -320,7 +322,8 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     List<Iterator<RowData>> iteratorsForPartition = 
clusteringOps.stream().map(clusteringOp -> {
       Iterable<IndexedRecord> indexedRecords = () -> {
         try {
-          HoodieFileReaderFactory fileReaderFactory = 
HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType());
+          HoodieFileReaderFactory fileReaderFactory = 
HoodieIOFactory.getIOFactory(table.getStorageConf())
+              
.getReaderFactory(table.getConfig().getRecordMerger().getRecordType());
           HoodieAvroFileReader fileReader = (HoodieAvroFileReader) 
fileReaderFactory.getFileReader(
               table.getConfig(), table.getStorageConf(), new 
StoragePath(clusteringOp.getDataFilePath()));
 
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 48f7e41e047..f8177a869b1 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -27,7 +27,7 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -101,7 +101,8 @@ public class HFileUtils extends BaseFileUtils {
     LOG.info("Reading schema from {}", filePath);
 
     try (HoodieFileReader fileReader =
-             
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+             HoodieIOFactory.getIOFactory(configuration)
+                 .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
                  .getFileReader(
                      ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
                      configuration,
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
new file mode 100644
index 00000000000..65c8d028adb
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory;
+import org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory;
+
+/**
+ * Creates readers and writers for AVRO record payloads.
+ * Currently uses reflection to support SPARK record payloads but
+ * this ability should be removed with [HUDI-7746]
+ */
+public class HoodieHadoopIOFactory extends HoodieIOFactory {
+
+  @Override
+  public HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
+    switch (recordType) {
+      case AVRO:
+        return new HoodieAvroFileReaderFactory();
+      case SPARK:
+        //TODO: remove this case [HUDI-7746]
+        try {
+          return 
ReflectionUtils.loadClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
+        } catch (Exception e) {
+          throw new HoodieException("Unable to create 
HoodieSparkFileReaderFactory", e);
+        }
+      default:
+        throw new UnsupportedOperationException(recordType + " record type not 
supported");
+    }
+  }
+
+  @Override
+  public HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
+    switch (recordType) {
+      case AVRO:
+        return new HoodieAvroFileWriterFactory();
+      case SPARK:
+        //TODO: remove this case [HUDI-7746]
+        try {
+          return 
ReflectionUtils.loadClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
+        } catch (Exception e) {
+          throw new HoodieException("Unable to create 
HoodieSparkFileWriterFactory", e);
+        }
+      default:
+        throw new UnsupportedOperationException(recordType + " record type not 
supported");
+    }
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
index 7faf84a1ee5..85731674cd6 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -48,7 +49,7 @@ public class TestHoodieAvroFileReaderFactory {
     // parquet file format.
     final StorageConfiguration<?> storageConf = 
HadoopFSUtils.getStorageConf(new Configuration());
     final StoragePath parquetPath = new 
StoragePath("/partition/path/f1_1-0-1_000.parquet");
-    HoodieFileReader parquetReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    HoodieFileReader parquetReader = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, 
parquetPath);
     assertTrue(parquetReader instanceof HoodieAvroParquetReader);
 
@@ -56,14 +57,15 @@ public class TestHoodieAvroFileReaderFactory {
     final StoragePath logPath = new StoragePath(
         
"/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
     final Throwable thrown = assertThrows(UnsupportedOperationException.class, 
() -> {
-      HoodieFileReader logWriter = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      HoodieFileReader logWriter = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, logPath);
     }, "should fail since log storage reader is not supported yet.");
     assertTrue(thrown.getMessage().contains("format not supported yet."));
 
     // Orc file format.
     final StoragePath orcPath = new 
StoragePath("/partition/path/f1_1-0-1_000.orc");
-    HoodieFileReader orcReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    HoodieFileReader orcReader = HoodieIOFactory.getIOFactory(storageConf)
+        .getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, orcPath);
     assertTrue(orcReader instanceof HoodieAvroOrcReader);
   }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
index 6a94a32ed3c..0cf0ca9d445 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.io.storage.HoodieOrcConfig;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -78,7 +78,7 @@ public class TestHoodieOrcReaderWriter extends 
TestHoodieReaderWriterBase {
   @Override
   protected HoodieAvroFileReader createReader(
       StorageConfiguration<?> conf) throws Exception {
-    return (HoodieAvroFileReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    return (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(conf).getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, conf, getFilePath());
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
index 97177ab260d..85e9fcac311 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -26,7 +26,8 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
@@ -56,8 +57,9 @@ public class HoodieHFileRecordReader implements 
RecordReader<NullWritable, Array
   public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
     FileSplit fileSplit = (FileSplit) split;
     StoragePath path = convertToStoragePath(fileSplit.getPath());
-    HoodieConfig hoodieConfig = 
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
-    reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+    HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+    reader = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), path, 
HoodieFileFormat.HFILE, Option.empty());
 
     schema = reader.getSchema();
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index a66f3264e33..a2951973755 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -26,7 +26,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalType;
@@ -309,8 +310,9 @@ public class HoodieRealtimeRecordReaderUtils {
   }
 
   public static HoodieFileReader getBaseFileReader(Path path, JobConf conf) 
throws IOException {
-    HoodieConfig hoodieConfig = 
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
-    return 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+    HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+    return 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), 
convertToStoragePath(path));
   }
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index aa2e277edc9..59e04972692 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -43,7 +43,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -274,7 +274,8 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
     if (fileSlice.getBaseFile().isPresent()) {
       // Read the base files using the latest writer schema.
       Schema schema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(schemaStr));
-      HoodieAvroFileReader reader = 
TypeUtils.unsafeCast(HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      HoodieAvroFileReader reader = 
TypeUtils.unsafeCast(HoodieIOFactory.getIOFactory(metaClient.getStorageConf())
+          .getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(
               DEFAULT_HUDI_CONFIG_FOR_READER,
               metaClient.getStorageConf(),
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index c546e662b19..97a6016f873 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.config.HoodieReaderConfig
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.model.WriteConcurrencyMode
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
@@ -31,6 +31,7 @@ import 
org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.io.storage.HoodieSparkIOFactory
 import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.util.PathUtils
 
@@ -64,6 +65,9 @@ class DefaultSource extends RelationProvider
       // Enable "passPartitionByAsOptions" to support "write.partitionBy(...)"
       
spark.conf.set("spark.sql.legacy.sources.write.passPartitionByAsOptions", 
"true")
     }
+    // Always use spark io factory
+    
spark.sparkContext.hadoopConfiguration.set(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.key(),
+      classOf[HoodieSparkIOFactory].getName)
     // Revisit EMRFS incompatibilities, for now disable
     
spark.sparkContext.hadoopConfiguration.set("fs.s3.metadata.cache.expiration.seconds",
 "0")
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 10685b624bc..ff70ed7a14c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -43,7 +43,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
-import org.apache.hudi.io.storage.HoodieFileReaderFactory
+import org.apache.hudi.io.storage.HoodieSparkIOFactory
 import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
 import org.apache.avro.Schema
@@ -854,7 +854,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
       val hoodieConfig = new HoodieConfig()
       hoodieConfig.setValue(USE_NATIVE_HFILE_READER,
         options.getOrElse(USE_NATIVE_HFILE_READER.key(), 
USE_NATIVE_HFILE_READER.defaultValue().toString))
-      val reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      val reader = (new 
HoodieSparkIOFactory).getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, storageConf, filePath, HFILE)
 
       val requiredRowSchema = requiredDataSchema.structTypeSchema
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 9f792daabef..283b6167fc4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -62,7 +62,6 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.storage.HoodieStorage;
@@ -108,6 +107,7 @@ import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
 
 /**
@@ -1487,7 +1487,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       HoodieConfig hoodieConfig = new HoodieConfig();
       hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER,
           Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER)));
-      try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      try (HoodieFileReader fileReader = 
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(hoodieConfig, metaClient.getStorageConf(), path)) {
         bloomFilter = fileReader.readBloomFilter();
         if (bloomFilter == null) {

Reply via email to