This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 63ad11f54bb [HUDI-7797] Use HoodieIOFactory to return pluggable 
FileFormatUtils implementation (#11302)
63ad11f54bb is described below

commit 63ad11f54bbb552ec98648b2b61a3fdcf99dcbf3
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat May 25 17:52:23 2024 -0700

    [HUDI-7797] Use HoodieIOFactory to return pluggable FileFormatUtils 
implementation (#11302)
---
 .../hudi/io/HoodieKeyLocationFetchHandle.java      |  4 +-
 .../testutils/HoodieJavaClientTestHarness.java     | 14 +++--
 .../hudi/io/storage/HoodieSparkParquetReader.java  |  3 +-
 .../hudi/client/TestUpdateSchemaEvolution.java     |  5 +-
 .../commit/TestCopyOnWriteActionExecutor.java      |  7 +--
 .../hudi/testutils/HoodieClientTestBase.java       |  4 +-
 .../hudi/common/model/HoodiePartitionMetadata.java |  7 ++-
 .../hudi/common/table/TableSchemaResolver.java     |  8 +--
 .../table/log/block/HoodieHFileDataBlock.java      |  8 +--
 .../table/log/block/HoodieParquetDataBlock.java    |  6 +--
 .../apache/hudi/common/util/FileFormatUtils.java   | 26 ---------
 .../apache/hudi/io/storage/HoodieIOFactory.java    | 56 +++++++++++++++++++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  3 +-
 .../hudi/sink/bootstrap/BootstrapOperator.java     |  4 +-
 .../apache/hudi/io/hadoop/HoodieAvroOrcReader.java |  3 +-
 .../hudi/io/hadoop/HoodieAvroParquetReader.java    |  4 +-
 .../hudi/io/hadoop/HoodieHadoopIOFactory.java      | 19 +++++++
 .../hudi/io/hadoop/TestHoodieHadoopIOFactory.java  | 61 ++++++++++++++++++++++
 .../org/apache/spark/sql/hudi/SparkHelpers.scala   | 12 +++--
 .../org/apache/hudi/ColumnStatsIndexHelper.java    |  4 +-
 .../utilities/HoodieMetadataTableValidator.java    | 11 ++--
 21 files changed, 202 insertions(+), 67 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index 835f102f414..a6245fa8950 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
@@ -50,7 +51,8 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> extends 
HoodieReadHandle<T
   }
 
   private List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
-    FileFormatUtils fileFormatUtils = 
FileFormatUtils.getInstance(baseFile.getStoragePath());
+    FileFormatUtils fileFormatUtils = 
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
+        .getFileFormatUtils(baseFile.getStoragePath());
     if (keyGeneratorOpt.isPresent()) {
       return 
fileFormatUtils.fetchRecordKeysWithPositions(hoodieTable.getStorage(), 
baseFile.getStoragePath(), keyGeneratorOpt);
     } else {
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index ba1d71845f4..520e5ba0a1f 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -62,6 +62,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.JavaHoodieIndexFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -909,7 +910,8 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
       HashMap<String, String> paths =
           getLatestFileIDsToFullPath(basePath, commitTimeline, 
Arrays.asList(commitInstant));
       return paths.values().stream().map(StoragePath::new).flatMap(path ->
-              FileFormatUtils.getInstance(path).readAvroRecords(storage, 
path).stream())
+              HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(path)
+                  .readAvroRecords(storage, path).stream())
           .filter(record -> {
             if (filterByCommitTime) {
               Object commitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -938,7 +940,7 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
     try {
       List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage, 
paths);
       return latestFiles.stream().mapToLong(baseFile ->
-              FileFormatUtils.getInstance(baseFile.getStoragePath())
+              
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(baseFile.getStoragePath())
                   .readAvroRecords(storage, baseFile.getStoragePath()).size())
           .sum();
     } catch (Exception e) {
@@ -976,8 +978,9 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
       HashMap<String, String> fileIdToFullPath = 
getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
       String[] paths = fileIdToFullPath.values().toArray(new 
String[fileIdToFullPath.size()]);
       if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-        return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> 
FileFormatUtils.getInstance(path)
-                .readAvroRecords(storage, path).stream())
+        return Arrays.stream(paths).map(StoragePath::new).flatMap(path ->
+                HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(path)
+                    .readAvroRecords(storage, path).stream())
             .filter(record -> {
               if (lastCommitTimeOpt.isPresent()) {
                 Object commitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -1025,7 +1028,8 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
   }
 
   public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient 
metaClient) {
-    return 
FileFormatUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+    return HoodieIOFactory.getIOFactory(metaClient.getStorage())
+        .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat());
   }
 
   protected HoodieTableMetaClient createMetaClient() {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 4dc60c07283..960abb05370 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -69,7 +69,8 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
     this.storage = storage.newInstance(path, storage.getConf().newInstance());
     // Avoid adding record in list element when convert parquet schema to avro 
schema
     this.storage.getConf().set(ADD_LIST_ELEMENT_RECORDS, "false");
-    this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
+    this.parquetUtils = HoodieIOFactory.getIOFactory(storage)
+        .getFileFormatUtils(HoodieFileFormat.PARQUET);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index d8b7710d5c8..820ac2754c9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -28,7 +28,6 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
@@ -36,6 +35,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
@@ -134,7 +134,8 @@ public class TestUpdateSchemaEvolution extends 
HoodieSparkClientTestHarness impl
       Executable executable = () -> {
         HoodieMergeHandle mergeHandle = new 
HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
             updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), 
insertResult.getFileId(), supplier, Option.empty());
-        List<GenericRecord> oldRecords = 
FileFormatUtils.getInstance(updateTable.getBaseFileFormat())
+        List<GenericRecord> oldRecords = 
HoodieIOFactory.getIOFactory(updateTable.getStorage())
+            .getFileFormatUtils(updateTable.getBaseFileFormat())
             .readAvroRecords(updateTable.getStorage(),
                 new StoragePath(updateTable.getConfig().getBasePath() + "/" + 
insertResult.getStat().getPath()),
                 mergeHandle.getWriterSchemaWithMetaFields());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index af69b13c6be..f1628316a49 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -36,7 +36,6 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.testutils.Transformations;
-import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -47,6 +46,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
@@ -205,14 +205,15 @@ public class TestCopyOnWriteActionExecutor extends 
HoodieClientTestBase implemen
 
     // Read out the bloom filter and make sure filter can answer record exist 
or not
     Path filePath = allFiles[0].getPath();
-    BloomFilter filter = FileFormatUtils.getInstance(table.getBaseFileFormat())
+    BloomFilter filter = 
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(table.getBaseFileFormat())
         .readBloomFilterFromMetadata(storage, new 
StoragePath(filePath.toUri()));
     for (HoodieRecord record : records) {
       assertTrue(filter.mightContain(record.getRecordKey()));
     }
 
     // Read the base file, check the record content
-    List<GenericRecord> fileRecords = 
FileFormatUtils.getInstance(table.getBaseFileFormat())
+    List<GenericRecord> fileRecords = HoodieIOFactory.getIOFactory(storage)
+        .getFileFormatUtils(table.getBaseFileFormat())
         .readAvroRecords(storage, new StoragePath(filePath.toUri()));
     GenericRecord newRecord;
     int index = 0;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 81b7c6651d9..2fceb1057da 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.HoodieSparkTable;
 
@@ -640,6 +641,7 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
   }
 
   public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient 
metaClient) {
-    return 
FileFormatUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+    return HoodieIOFactory.getIOFactory(metaClient.getStorage())
+        .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat());
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index 9256e6f4440..16fd7d2f434 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.RetryHelper;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
@@ -137,7 +138,8 @@ public class HoodiePartitionMetadata {
         HOODIE_PARTITION_METAFILE_PREFIX + "_" + UUID.randomUUID() + 
getMetafileExtension());
     try {
       // write to temporary file
-      FileFormatUtils.getInstance(format).writeMetaFile(storage, tmpPath, 
props);
+      HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(format)
+          .writeMetaFile(storage, tmpPath, props);
       // move to actual path
       storage.rename(tmpPath, filePath);
     } finally {
@@ -185,7 +187,8 @@ public class HoodiePartitionMetadata {
   private boolean readBaseFormatMetaFile() {
     for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) {
       try {
-        FileFormatUtils reader = FileFormatUtils.getInstance(metafilePath);
+        FileFormatUtils reader = HoodieIOFactory.getIOFactory(storage)
+            .getFileFormatUtils(metafilePath);
         // Data file format
         Map<String, String> metadata = reader.readFooter(
             storage, true, metafilePath, PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 5c5d6133e96..0c84d2665dd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -43,6 +42,7 @@ import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.Lazy;
@@ -306,7 +306,8 @@ public class TableSchemaResolver {
         .orElseThrow(() -> new IllegalArgumentException("Could not find any 
data file written for compaction "
             + lastCompactionCommit + ", could not get schema for table " + 
metaClient.getBasePath()));
     StoragePath path = new StoragePath(filePath);
-    return 
FileFormatUtils.getInstance(path).readAvroSchema(metaClient.getStorage(), path);
+    return HoodieIOFactory.getIOFactory(metaClient.getStorage())
+        .getFileFormatUtils(path).readAvroSchema(metaClient.getStorage(), 
path);
   }
 
   private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
@@ -473,7 +474,8 @@ public class TableSchemaResolver {
         // this is a log file
         schema = readSchemaFromLogFile(filePath);
       } else {
-        schema = 
FileFormatUtils.getInstance(filePath).readAvroSchema(metaClient.getStorage(), 
filePath);
+        schema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
+            
.getFileFormatUtils(filePath).readAvroSchema(metaClient.getStorage(), filePath);
       }
     }
     return schema;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index db42a93a274..d917c554fb8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
@@ -105,9 +104,10 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
   protected byte[] serializeRecords(List<HoodieRecord> records, HoodieStorage 
storage) throws IOException {
     Schema writerSchema = new Schema.Parser().parse(
         
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
-    return 
FileFormatUtils.getInstance(HoodieFileFormat.HFILE).serializeRecordsToLogBlock(
-        storage, records, writerSchema, getSchema(), getKeyFieldName(),
-        Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), 
compressionCodec.get()));
+    return 
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.HFILE)
+        .serializeRecordsToLogBlock(
+            storage, records, writerSchema, getSchema(), getKeyFieldName(),
+            Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), 
compressionCodec.get()));
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 6c17d9abf49..fe0c8fb618e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.log.block;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.io.SeekableDataInputStream;
@@ -98,8 +97,9 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
     Schema writerSchema = new Schema.Parser().parse(
         
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
 
-    return FileFormatUtils.getInstance(PARQUET).serializeRecordsToLogBlock(
-        storage, records, writerSchema, getSchema(), getKeyFieldName(), 
paramsMap);
+    return HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(PARQUET)
+        .serializeRecordsToLogBlock(
+            storage, records, writerSchema, getSchema(), getKeyFieldName(), 
paramsMap);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index 468a0df68ee..1790ce8675b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -50,32 +50,6 @@ import java.util.stream.Collectors;
  * Utils for file format used in Hudi.
  */
 public abstract class FileFormatUtils {
-  public static final String PARQUET_UTILS = 
"org.apache.hudi.common.util.ParquetUtils";
-  public static final String ORC_UTILS = 
"org.apache.hudi.common.util.OrcUtils";
-  public static final String HFILE_UTILS = 
"org.apache.hudi.common.util.HFileUtils";
-
-  public static FileFormatUtils getInstance(StoragePath path) {
-    if 
(path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) {
-      return ReflectionUtils.loadClass(PARQUET_UTILS);
-    } else if 
(path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) {
-      return ReflectionUtils.loadClass(ORC_UTILS);
-    } else if 
(path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) {
-      return ReflectionUtils.loadClass(HFILE_UTILS);
-    }
-    throw new UnsupportedOperationException("The format for file " + path + " 
is not supported yet.");
-  }
-
-  public static FileFormatUtils getInstance(HoodieFileFormat fileFormat) {
-    if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
-      return ReflectionUtils.loadClass(PARQUET_UTILS);
-    } else if (HoodieFileFormat.ORC.equals(fileFormat)) {
-      return ReflectionUtils.loadClass(ORC_UTILS);
-    } else if (HoodieFileFormat.HFILE.equals(fileFormat)) {
-      return ReflectionUtils.loadClass(HFILE_UTILS);
-    }
-    throw new UnsupportedOperationException(fileFormat.name() + " format not 
supported yet.");
-  }
-
   /**
    * Aggregate column range statistics across files in a partition.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
index cba3c7b0e98..e1cff2a0424 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
@@ -19,17 +19,23 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.ConsistencyGuard;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
 /**
- * Base class to get HoodieFileReaderFactory and HoodieFileWriterFactory
+ * Base class to get {@link HoodieFileReaderFactory}, {@link 
HoodieFileWriterFactory}, and {@link FileFormatUtils}
  */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
 public abstract class HoodieIOFactory {
   protected final HoodieStorage storage;
 
@@ -48,12 +54,45 @@ public abstract class HoodieIOFactory {
     }
   }
 
+  /**
+   * @param recordType {@link HoodieRecord} type.
+   * @return a factory to create file readers.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType);
 
+  /**
+   * @param recordType {@link HoodieRecord} type.
+   * @return a factory to create file writers.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType);
 
+  /**
+   * @param fileFormat file format supported in Hudi.
+   * @return a util class to support read and write in the file format.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract FileFormatUtils getFileFormatUtils(HoodieFileFormat 
fileFormat);
+
+  /**
+   * @param storagePath file path.
+   * @return {@link HoodieStorage} instance.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract HoodieStorage getStorage(StoragePath storagePath);
 
+  /**
+   * @param path                   file path.
+   * @param enableRetry            whether to retry operations.
+   * @param maxRetryIntervalMs     maximum retry interval in milliseconds.
+   * @param maxRetryNumbers        maximum number of retries.
+   * @param initialRetryIntervalMs initial delay before retry in milliseconds.
+   * @param retryExceptions        retry exception list.
+   * @param consistencyGuard       {@link ConsistencyGuard} instance.
+   * @return {@link HoodieStorage} instance with retry capability if 
applicable.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract HoodieStorage getStorage(StoragePath path,
                                            boolean enableRetry,
                                            long maxRetryIntervalMs,
@@ -61,4 +100,19 @@ public abstract class HoodieIOFactory {
                                            long initialRetryIntervalMs,
                                            String retryExceptions,
                                            ConsistencyGuard consistencyGuard);
+
+  /**
+   * @param path file path.
+   * @return a util class to support read and write in the file format.
+   */
+  public final FileFormatUtils getFileFormatUtils(StoragePath path) {
+    if 
(path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) {
+      return getFileFormatUtils(HoodieFileFormat.PARQUET);
+    } else if 
(path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) {
+      return getFileFormatUtils(HoodieFileFormat.ORC);
+    } else if 
(path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) {
+      return getFileFormatUtils(HoodieFileFormat.HFILE);
+    }
+    throw new UnsupportedOperationException("The format for file " + path + " 
is not supported yet.");
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 60bfaa3c878..0c46a650a7d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1213,7 +1213,8 @@ public class HoodieTableMetadataUtil {
     try {
       if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
         StoragePath fullFilePath = new 
StoragePath(datasetMetaClient.getBasePathV2(), filePath);
-        return FileFormatUtils.getInstance(HoodieFileFormat.PARQUET)
+        return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
+            .getFileFormatUtils(HoodieFileFormat.PARQUET)
             .readColumnStatsFromMetadata(datasetMetaClient.getStorage(), 
fullFilePath, columnsToIndex);
       }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 0ab008ee0ac..af7b9fbbf0c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
 import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.meta.CkpMetadataFactory;
@@ -202,7 +203,8 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
     Option<HoodieInstant> latestCommitTime = 
commitsTimeline.filterCompletedAndCompactionInstants().lastInstant();
 
     if (latestCommitTime.isPresent()) {
-      FileFormatUtils fileUtils = 
FileFormatUtils.getInstance(this.hoodieTable.getBaseFileFormat());
+      FileFormatUtils fileUtils = 
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
+          .getFileFormatUtils(hoodieTable.getBaseFileFormat());
       Schema schema = new 
TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
 
       List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
index 2c603839dc8..92b7078048f 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
@@ -58,7 +59,7 @@ public class HoodieAvroOrcReader extends HoodieAvroFileReader 
{
   public HoodieAvroOrcReader(HoodieStorage storage, StoragePath path) {
     this.storage = storage;
     this.path = path;
-    this.orcUtils = FileFormatUtils.getInstance(HoodieFileFormat.ORC);
+    this.orcUtils = 
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.ORC);
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index c1f7f7cb4b4..dfbf4801687 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -32,6 +32,7 @@ import 
org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -68,7 +69,8 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     // by the Reader (for proper config propagation to Parquet components)
     this.storage = storage.newInstance(path, 
tryOverrideDefaultConfigs(storage.getConf().newInstance()));
     this.path = path;
-    this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
+    this.parquetUtils = HoodieIOFactory.getIOFactory(storage)
+        .getFileFormatUtils(HoodieFileFormat.PARQUET);
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHadoopIOFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHadoopIOFactory.java
index 4203fe90b4b..3b32d67a7f9 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHadoopIOFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHadoopIOFactory.java
@@ -20,7 +20,12 @@
 package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.fs.ConsistencyGuard;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.HFileUtils;
+import org.apache.hudi.common.util.OrcUtils;
+import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -79,6 +84,20 @@ public class HoodieHadoopIOFactory extends HoodieIOFactory {
     }
   }
 
+  @Override
+  public FileFormatUtils getFileFormatUtils(HoodieFileFormat fileFormat) {
+    switch (fileFormat) {
+      case PARQUET:
+        return new ParquetUtils();
+      case ORC:
+        return new OrcUtils();
+      case HFILE:
+        return new HFileUtils();
+      default:
+        throw new UnsupportedOperationException(fileFormat.name() + " format 
not supported yet.");
+    }
+  }
+
   @Override
   public HoodieStorage getStorage(StoragePath storagePath) {
     return storage.newInstance(storagePath, storage.getConf());
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHadoopIOFactory.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHadoopIOFactory.java
new file mode 100644
index 00000000000..86341b3c963
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHadoopIOFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hadoop;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.util.HFileUtils;
+import org.apache.hudi.common.util.OrcUtils;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.apache.hudi.storage.HoodieStorageUtils.DEFAULT_URI;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+/**
+ * Tests {@link HoodieHadoopIOFactory}
+ */
+public class TestHoodieHadoopIOFactory {
+  @Test
+  public void testGetFileFormatUtils() {
+    HoodieIOFactory ioFactory = new HoodieHadoopIOFactory(
+        new HoodieHadoopStorage(HadoopFSUtils.getFs(DEFAULT_URI, 
getDefaultStorageConf())));
+    assertTrue(ioFactory.getFileFormatUtils(new 
StoragePath("file:///a/b.parquet")) instanceof ParquetUtils);
+    assertTrue(ioFactory.getFileFormatUtils(new 
StoragePath("file:///a/b.orc")) instanceof OrcUtils);
+    assertTrue(ioFactory.getFileFormatUtils(new 
StoragePath("file:///a/b.hfile")) instanceof HFileUtils);
+    assertThrows(
+        UnsupportedOperationException.class,
+        () -> ioFactory.getFileFormatUtils(new 
StoragePath("file:///a/b.log")));
+
+    assertTrue(ioFactory.getFileFormatUtils(HoodieFileFormat.PARQUET) 
instanceof ParquetUtils);
+    assertTrue(ioFactory.getFileFormatUtils(HoodieFileFormat.ORC) instanceof 
OrcUtils);
+    assertTrue(ioFactory.getFileFormatUtils(HoodieFileFormat.HFILE) instanceof 
HFileUtils);
+    assertThrows(
+        UnsupportedOperationException.class,
+        () -> ioFactory.getFileFormatUtils(HoodieFileFormat.HOODIE_LOG));
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index e534a13d766..246c266d467 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -23,9 +23,9 @@ import org.apache.hudi.common.bloom.{BloomFilter, 
BloomFilterFactory}
 import org.apache.hudi.common.config.HoodieStorageConfig
 import 
org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES,
 BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE}
 import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.util.{FileFormatUtils, Option}
+import org.apache.hudi.common.util.Option
 import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter
-import org.apache.hudi.io.storage.HoodieParquetConfig
+import org.apache.hudi.io.storage.{HoodieIOFactory, HoodieParquetConfig}
 import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, 
StoragePath}
 
 import org.apache.avro.Schema
@@ -48,7 +48,9 @@ object SparkHelpers {
                               sourceFile: StoragePath,
                               destinationFile: StoragePath,
                               keysToSkip: Set[String]) {
-    val sourceRecords = 
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(storage, 
sourceFile).asScala
+    val sourceRecords = HoodieIOFactory.getIOFactory(storage)
+      .getFileFormatUtils(HoodieFileFormat.PARQUET)
+      .readAvroRecords(storage, sourceFile).asScala
     val schema: Schema = sourceRecords.head.getSchema
     val filter: BloomFilter = BloomFilterFactory.createBloomFilter(
       BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, 
BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
@@ -140,7 +142,9 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
    * @return <pre>true</pre>  if all keys are added to the bloom filter;  
<pre>false</pre>  otherwise.
    */
   def fileKeysAgainstBF(storage: HoodieStorage, sqlContext: SQLContext, file: 
String): Boolean = {
-    val bf = 
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(storage,
 new StoragePath(file))
+    val bf = HoodieIOFactory.getIOFactory(storage)
+      .getFileFormatUtils(HoodieFileFormat.PARQUET)
+      .readBloomFilterFromMetadata(storage, new StoragePath(file))
     val foundCount = sqlContext.parquetFile(file)
       .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
       .collect().count(r => !bf.mightContain(r.getString(0)))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 6653c9cf969..357200f5f0e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -18,8 +18,6 @@
 package org.apache.hudi;
 
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -176,7 +174,7 @@ public class ColumnStatsIndexHelper {
       colMinMaxInfos =
           jsc.parallelize(baseFilesPaths, numParallelism)
               .mapPartitions(paths -> {
-                ParquetUtils utils = (ParquetUtils) 
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
+                ParquetUtils utils = new ParquetUtils();
                 Iterable<String> iterable = () -> paths;
                 return StreamSupport.stream(iterable.spliterator(), false)
                     .flatMap(path -> {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index ca7b92af836..922d920c67e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -62,6 +62,7 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -1439,11 +1440,13 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             .sorted(new HoodieColumnRangeMetadataComparator())
             .collect(Collectors.toList());
       } else {
+        FileFormatUtils formatUtils = 
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+            .getFileFormatUtils(HoodieFileFormat.PARQUET);
         return baseFileNameList.stream().flatMap(filename ->
-                
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata(
-                    metaClient.getStorage(),
-                    new 
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(), 
partitionPath), filename),
-                    allColumnNameList).stream())
+                formatUtils.readColumnStatsFromMetadata(
+                        metaClient.getStorage(),
+                        new 
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(), 
partitionPath), filename),
+                        allColumnNameList).stream())
             .sorted(new HoodieColumnRangeMetadataComparator())
             .collect(Collectors.toList());
       }


Reply via email to