This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new dac73ab6121 [HUDI-7797] Use HoodieIOFactory to return pluggable
FileFormatUtils implementation (#11310)
dac73ab6121 is described below
commit dac73ab61214e12ae50cd111220476283cccf6d1
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun May 26 00:34:12 2024 -0700
[HUDI-7797] Use HoodieIOFactory to return pluggable FileFormatUtils
implementation (#11310)
---
.../hudi/io/HoodieKeyLocationFetchHandle.java | 4 +-
.../client/TestHoodieJavaWriteClientInsert.java | 7 ++-
.../TestHoodieJavaClientOnCopyOnWriteStorage.java | 6 +-
.../commit/TestJavaCopyOnWriteActionExecutor.java | 7 ++-
.../testutils/HoodieJavaClientTestHarness.java | 12 ++--
.../hudi/io/storage/HoodieSparkParquetReader.java | 3 +-
.../hudi/client/TestUpdateSchemaEvolution.java | 5 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 24 +++++---
.../commit/TestCopyOnWriteActionExecutor.java | 12 ++--
.../hudi/common/model/HoodiePartitionMetadata.java | 7 ++-
.../hudi/common/table/TableSchemaResolver.java | 8 ++-
.../table/log/block/HoodieHFileDataBlock.java | 8 +--
.../table/log/block/HoodieParquetDataBlock.java | 6 +-
.../apache/hudi/common/util/FileFormatUtils.java | 31 ----------
.../apache/hudi/io/storage/HoodieIOFactory.java | 56 +++++++++++++++++-
.../hudi/metadata/HoodieTableMetadataUtil.java | 4 +-
.../hudi/sink/bootstrap/BootstrapOperator.java | 4 +-
.../apache/hudi/io/hadoop/HoodieAvroOrcReader.java | 3 +-
.../hudi/io/hadoop/HoodieAvroParquetReader.java | 4 +-
.../hudi/io/hadoop/HoodieHadoopIOFactory.java | 19 +++++++
.../hudi/io/hadoop/TestHoodieHadoopIOFactory.java | 66 ++++++++++++++++++++++
.../org/apache/spark/sql/hudi/SparkHelpers.scala | 12 ++--
.../org/apache/hudi/ColumnStatsIndexHelper.java | 4 +-
.../utilities/HoodieMetadataTableValidator.java | 11 ++--
24 files changed, 236 insertions(+), 87 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index 4d82d661f64..c94e30c9d5c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
@@ -50,7 +51,8 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> extends
HoodieReadHandle<T
}
private List<HoodieKey> fetchHoodieKeys(HoodieBaseFile baseFile) {
- FileFormatUtils fileFormatUtils =
FileFormatUtils.getInstance(baseFile.getStoragePath());
+ FileFormatUtils fileFormatUtils =
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
+ .getFileFormatUtils(baseFile.getStoragePath());
if (keyGeneratorOpt.isPresent()) {
return fileFormatUtils.fetchHoodieKeys(hoodieTable.getStorage(),
baseFile.getStoragePath(), keyGeneratorOpt);
} else {
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
index 53d069736e7..60907acec5c 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
@@ -37,6 +37,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.testutils.HoodieJavaClientTestHarness;
@@ -147,7 +148,8 @@ public class TestHoodieJavaWriteClientInsert extends
HoodieJavaClientTestHarness
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
metaClient = HoodieTableMetaClient.reload(metaClient);
- FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient);
+ FileFormatUtils fileUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat());
// Get some records belong to the same partition (2021/09/11)
String insertRecordStr1 = "{\"_row_key\":\"1\","
@@ -221,7 +223,8 @@ public class TestHoodieJavaWriteClientInsert extends
HoodieJavaClientTestHarness
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
metaClient = HoodieTableMetaClient.reload(metaClient);
- FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient);
+ FileFormatUtils fileUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat());
String partitionPath = "2021/09/11";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[]{partitionPath});
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index ad92748a15e..b195194938d 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -65,7 +65,6 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
@@ -86,6 +85,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
@@ -1028,7 +1028,9 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords,
List<WriteStatus> allStatus, List<GenericRecord> records) {
for (WriteStatus status : allStatus) {
StoragePath filePath = new StoragePath(basePath,
status.getStat().getPath());
-
records.addAll(FileFormatUtils.getInstance(metaClient).readAvroRecords(storage,
filePath));
+ records.addAll(HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+ .readAvroRecords(storage, filePath));
}
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
assertEquals(records.size(), expectedKeys.size());
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
index dedf787c127..3cc16928d0a 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -41,6 +41,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
import org.apache.hudi.table.HoodieJavaTable;
@@ -131,7 +132,8 @@ public class TestJavaCopyOnWriteActionExecutor extends
HoodieJavaClientTestHarne
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
- FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient);
+ FileFormatUtils fileUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat());
String partitionPath = "2016/01/31";
@@ -480,7 +482,8 @@ public class TestJavaCopyOnWriteActionExecutor extends
HoodieJavaClientTestHarne
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
- FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient);
+ FileFormatUtils fileUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat());
String partitionPath = "2022/04/09";
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 439ab09c897..a36e0a5876c 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -50,7 +50,6 @@ import
org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -62,6 +61,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.JavaHoodieIndexFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -908,7 +908,8 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
HashMap<String, String> paths =
getLatestFileIDsToFullPath(basePath, commitTimeline,
Arrays.asList(commitInstant));
return paths.values().stream().map(StoragePath::new).flatMap(path ->
- FileFormatUtils.getInstance(path).readAvroRecords(storage,
path).stream())
+ HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(path)
+ .readAvroRecords(storage, path).stream())
.filter(record -> {
if (filterByCommitTime) {
Object commitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -937,7 +938,7 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
try {
List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage,
paths);
return latestFiles.stream().mapToLong(baseFile ->
- FileFormatUtils.getInstance(baseFile.getStoragePath())
+
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(baseFile.getStoragePath())
.readAvroRecords(storage, baseFile.getStoragePath()).size())
.sum();
} catch (Exception e) {
@@ -975,8 +976,9 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
HashMap<String, String> fileIdToFullPath =
getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
String[] paths = fileIdToFullPath.values().toArray(new
String[fileIdToFullPath.size()]);
if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- return Arrays.stream(paths).map(StoragePath::new).flatMap(path ->
FileFormatUtils.getInstance(path)
- .readAvroRecords(storage, path).stream())
+ return Arrays.stream(paths).map(StoragePath::new).flatMap(path ->
+ HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(path)
+ .readAvroRecords(storage, path).stream())
.filter(record -> {
if (lastCommitTimeOpt.isPresent()) {
Object commitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 49b647eec5f..dc1e5238b2e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -68,7 +68,8 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
this.storage = storage.newInstance(path, storage.getConf().newInstance());
// Avoid adding record in list element when convert parquet schema to avro
schema
this.storage.getConf().set(ADD_LIST_ELEMENT_RECORDS, "false");
- this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
+ this.parquetUtils = HoodieIOFactory.getIOFactory(storage)
+ .getFileFormatUtils(HoodieFileFormat.PARQUET);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index b96d8723b51..df1ad422f62 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
@@ -35,6 +34,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
@@ -132,7 +132,8 @@ public class TestUpdateSchemaEvolution extends
HoodieSparkClientTestHarness impl
Executable executable = () -> {
HoodieMergeHandle mergeHandle = new
HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
updateRecords.iterator(), updateRecords.get(0).getPartitionPath(),
insertResult.getFileId(), supplier, Option.empty());
- List<GenericRecord> oldRecords =
FileFormatUtils.getInstance(updateTable.getBaseFileFormat())
+ List<GenericRecord> oldRecords =
HoodieIOFactory.getIOFactory(updateTable.getStorage())
+ .getFileFormatUtils(updateTable.getBaseFileFormat())
.readAvroRecords(updateTable.getStorage(),
new StoragePath(updateTable.getConfig().getBasePath() + "/" +
insertResult.getStat().getPath()),
mergeHandle.getWriterSchemaWithMetaFields());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 1738414f099..48877b1ea55 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -105,6 +105,7 @@ import
org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
@@ -1197,7 +1198,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
- FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient);
+ FileFormatUtils fileUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat());
// Inserts => will write file1
String commitTime1 = "001";
@@ -1310,7 +1312,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit,
false, mergeAllowDuplicateInserts); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
- FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient);
+ FileFormatUtils fileUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat());
// Inserts => will write file1
String commitTime1 = "001";
@@ -1407,8 +1410,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
- assertEquals(100,
- FileFormatUtils.getInstance(metaClient).readRowKeys(storage, new
StoragePath(basePath, statuses.get(0).getStat().getPath()))
+ assertEquals(100, HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+ .readRowKeys(storage, new StoragePath(basePath,
statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Delete 20 among 100 inserted
@@ -2090,7 +2094,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords,
List<WriteStatus> allStatus, List<GenericRecord> records) {
for (WriteStatus status : allStatus) {
StoragePath filePath = new StoragePath(basePath,
status.getStat().getPath());
-
records.addAll(FileFormatUtils.getInstance(metaClient).readAvroRecords(storage,
filePath));
+ records.addAll(HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+ .readAvroRecords(storage, filePath));
}
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
assertEquals(records.size(), expectedKeys.size());
@@ -2179,10 +2185,14 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
StoragePath newFile = new StoragePath(basePath,
statuses.get(0).getStat().getPath());
assertEquals(expectedRecords,
- FileFormatUtils.getInstance(metaClient).readRowKeys(storage,
newFile).size(),
+ HoodieIOFactory.getIOFactory(metaClient.getStorage())
+
.getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+ .readRowKeys(storage, newFile).size(),
"file should contain 110 records");
- List<GenericRecord> records =
FileFormatUtils.getInstance(metaClient).readAvroRecords(storage, newFile);
+ List<GenericRecord> records =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+ .readAvroRecords(storage, newFile);
for (GenericRecord record : records) {
String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertTrue(keys.contains(recordKey), "key expected to be part of " +
instantTime);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 285383db036..03f0cf158cd 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -36,7 +36,6 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -47,6 +46,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
@@ -205,14 +205,15 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestBase implemen
// Read out the bloom filter and make sure filter can answer record exist
or not
Path filePath = allFiles[0].getPath();
- BloomFilter filter = FileFormatUtils.getInstance(table.getBaseFileFormat())
+ BloomFilter filter =
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(table.getBaseFileFormat())
.readBloomFilterFromMetadata(storage, new
StoragePath(filePath.toUri()));
for (HoodieRecord record : records) {
assertTrue(filter.mightContain(record.getRecordKey()));
}
// Read the base file, check the record content
- List<GenericRecord> fileRecords =
FileFormatUtils.getInstance(table.getBaseFileFormat())
+ List<GenericRecord> fileRecords = HoodieIOFactory.getIOFactory(storage)
+ .getFileFormatUtils(table.getBaseFileFormat())
.readAvroRecords(storage, new StoragePath(filePath.toUri()));
GenericRecord newRecord;
int index = 0;
@@ -247,8 +248,9 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestBase implemen
// Check whether the record has been updated
Path updatedFilePath = allFiles[0].getPath();
- BloomFilter updatedFilter =
-
FileFormatUtils.getInstance(metaClient).readBloomFilterFromMetadata(storage,
new StoragePath(updatedFilePath.toUri()));
+ BloomFilter updatedFilter =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+ .readBloomFilterFromMetadata(storage, new
StoragePath(updatedFilePath.toUri()));
for (HoodieRecord record : records) {
// No change to the _row_key
assertTrue(updatedFilter.mightContain(record.getRecordKey()));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index 9256e6f4440..16fd7d2f434 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.RetryHelper;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -137,7 +138,8 @@ public class HoodiePartitionMetadata {
HOODIE_PARTITION_METAFILE_PREFIX + "_" + UUID.randomUUID() +
getMetafileExtension());
try {
// write to temporary file
- FileFormatUtils.getInstance(format).writeMetaFile(storage, tmpPath,
props);
+ HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(format)
+ .writeMetaFile(storage, tmpPath, props);
// move to actual path
storage.rename(tmpPath, filePath);
} finally {
@@ -185,7 +187,8 @@ public class HoodiePartitionMetadata {
private boolean readBaseFormatMetaFile() {
for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) {
try {
- FileFormatUtils reader = FileFormatUtils.getInstance(metafilePath);
+ FileFormatUtils reader = HoodieIOFactory.getIOFactory(storage)
+ .getFileFormatUtils(metafilePath);
// Data file format
Map<String, String> metadata = reader.readFooter(
storage, true, metafilePath, PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index bf77a712c58..08a76722f5c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -43,6 +42,7 @@ import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.internal.schema.InternalSchema;
import
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.Lazy;
@@ -302,7 +302,8 @@ public class TableSchemaResolver {
.orElseThrow(() -> new IllegalArgumentException("Could not find any
data file written for compaction "
+ lastCompactionCommit + ", could not get schema for table " +
metaClient.getBasePath()));
StoragePath path = new StoragePath(filePath);
- return
FileFormatUtils.getInstance(path).readAvroSchema(metaClient.getStorage(), path);
+ return HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(path).readAvroSchema(metaClient.getStorage(),
path);
}
private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
@@ -469,7 +470,8 @@ public class TableSchemaResolver {
// this is a log file
schema = readSchemaFromLogFile(filePath);
} else {
- schema =
FileFormatUtils.getInstance(filePath).readAvroSchema(metaClient.getStorage(),
filePath);
+ schema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
+
.getFileFormatUtils(filePath).readAvroSchema(metaClient.getStorage(), filePath);
}
}
return schema;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index e997f5e9aaa..873aa8f431e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
@@ -105,9 +104,10 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
protected byte[] serializeRecords(List<HoodieRecord> records, HoodieStorage
storage) throws IOException {
Schema writerSchema = new Schema.Parser().parse(
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
- return
FileFormatUtils.getInstance(HoodieFileFormat.HFILE).serializeRecordsToLogBlock(
- storage, records, writerSchema, getSchema(), getKeyFieldName(),
- Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(),
compressionCodec.get()));
+ return
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.HFILE)
+ .serializeRecordsToLogBlock(
+ storage, records, writerSchema, getSchema(), getKeyFieldName(),
+ Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(),
compressionCodec.get()));
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index d96941e592f..265313b722e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -20,7 +20,6 @@ package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.io.SeekableDataInputStream;
@@ -96,8 +95,9 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
Schema writerSchema = new Schema.Parser().parse(
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
- return FileFormatUtils.getInstance(PARQUET).serializeRecordsToLogBlock(
- storage, records, writerSchema, getSchema(), getKeyFieldName(),
paramsMap);
+ return HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(PARQUET)
+ .serializeRecordsToLogBlock(
+ storage, records, writerSchema, getSchema(), getKeyFieldName(),
paramsMap);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index e12b5a05ec8..c6ea01a1688 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
@@ -47,36 +46,6 @@ import java.util.Set;
* Utils for file format used in Hudi.
*/
public abstract class FileFormatUtils {
- public static final String PARQUET_UTILS =
"org.apache.hudi.common.util.ParquetUtils";
- public static final String ORC_UTILS =
"org.apache.hudi.common.util.OrcUtils";
- public static final String HFILE_UTILS =
"org.apache.hudi.common.util.HFileUtils";
-
- public static FileFormatUtils getInstance(StoragePath path) {
- if
(path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) {
- return ReflectionUtils.loadClass(PARQUET_UTILS);
- } else if
(path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) {
- return ReflectionUtils.loadClass(ORC_UTILS);
- } else if
(path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) {
- return ReflectionUtils.loadClass(HFILE_UTILS);
- }
- throw new UnsupportedOperationException("The format for file " + path + "
is not supported yet.");
- }
-
- public static FileFormatUtils getInstance(HoodieFileFormat fileFormat) {
- if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
- return ReflectionUtils.loadClass(PARQUET_UTILS);
- } else if (HoodieFileFormat.ORC.equals(fileFormat)) {
- return ReflectionUtils.loadClass(ORC_UTILS);
- } else if (HoodieFileFormat.HFILE.equals(fileFormat)) {
- return ReflectionUtils.loadClass(HFILE_UTILS);
- }
- throw new UnsupportedOperationException(fileFormat.name() + " format not
supported yet.");
- }
-
- public static FileFormatUtils getInstance(HoodieTableMetaClient metaClient) {
- return getInstance(metaClient.getTableConfig().getBaseFileFormat());
- }
-
/**
* Read the rowKey list from the given data file.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
index cba3c7b0e98..e1cff2a0424 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
@@ -19,17 +19,23 @@
package org.apache.hudi.io.storage;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuard;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
/**
- * Base class to get HoodieFileReaderFactory and HoodieFileWriterFactory
+ * Base class to get {@link HoodieFileReaderFactory}, {@link
HoodieFileWriterFactory}, and {@link FileFormatUtils}
*/
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public abstract class HoodieIOFactory {
protected final HoodieStorage storage;
@@ -48,12 +54,45 @@ public abstract class HoodieIOFactory {
}
}
+ /**
+ * @param recordType {@link HoodieRecord} type.
+ * @return a factory to create file readers.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract HoodieFileReaderFactory
getReaderFactory(HoodieRecord.HoodieRecordType recordType);
+ /**
+ * @param recordType {@link HoodieRecord} type.
+ * @return a factory to create file writers.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract HoodieFileWriterFactory
getWriterFactory(HoodieRecord.HoodieRecordType recordType);
+ /**
+ * @param fileFormat file format supported in Hudi.
+ * @return a util class to support read and write in the file format.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract FileFormatUtils getFileFormatUtils(HoodieFileFormat
fileFormat);
+
+ /**
+ * @param storagePath file path.
+ * @return {@link HoodieStorage} instance.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract HoodieStorage getStorage(StoragePath storagePath);
+ /**
+ * @param path file path.
+ * @param enableRetry whether to retry operations.
+ * @param maxRetryIntervalMs maximum retry interval in milliseconds.
+ * @param maxRetryNumbers maximum number of retries.
+ * @param initialRetryIntervalMs initial delay before retry in milliseconds.
+ * @param retryExceptions retry exception list.
+ * @param consistencyGuard {@link ConsistencyGuard} instance.
+ * @return {@link HoodieStorage} instance with retry capability if
applicable.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract HoodieStorage getStorage(StoragePath path,
boolean enableRetry,
long maxRetryIntervalMs,
@@ -61,4 +100,19 @@ public abstract class HoodieIOFactory {
long initialRetryIntervalMs,
String retryExceptions,
ConsistencyGuard consistencyGuard);
+
+ /**
+ * @param path file path.
+ * @return a util class to support read and write in the file format.
+ */
+ public final FileFormatUtils getFileFormatUtils(StoragePath path) {
+ if
(path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return getFileFormatUtils(HoodieFileFormat.PARQUET);
+ } else if
(path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) {
+ return getFileFormatUtils(HoodieFileFormat.ORC);
+ } else if
(path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) {
+ return getFileFormatUtils(HoodieFileFormat.HFILE);
+ }
+ throw new UnsupportedOperationException("The format for file " + path + "
is not supported yet.");
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 7406943eb47..217ada6b3b1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -58,7 +58,6 @@ import
org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ExternalFilePathUtil;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
@@ -1175,7 +1174,8 @@ public class HoodieTableMetadataUtil {
try {
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePathV2(), filePath);
- return FileFormatUtils.getInstance(HoodieFileFormat.PARQUET)
+ return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
+ .getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(),
fullFilePath, columnsToIndex);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index e654209e87b..bfb22dc89d2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.table.HoodieTable;
@@ -200,7 +201,8 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
Option<HoodieInstant> latestCommitTime =
commitsTimeline.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
- FileFormatUtils fileUtils =
FileFormatUtils.getInstance(this.hoodieTable.getBaseFileFormat());
+ FileFormatUtils fileUtils =
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
+ .getFileFormatUtils(hoodieTable.getBaseFileFormat());
Schema schema = new
TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
index c709c5ef4f4..a2358d6cac3 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
@@ -27,6 +27,7 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -57,7 +58,7 @@ public class HoodieAvroOrcReader extends HoodieAvroFileReader
{
public HoodieAvroOrcReader(HoodieStorage storage, StoragePath path) {
this.storage = storage;
this.path = path;
- this.orcUtils = FileFormatUtils.getInstance(HoodieFileFormat.ORC);
+ this.orcUtils =
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.ORC);
}
@Override
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index 22af48fc7b7..cef11b0ef08 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -31,6 +31,7 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -67,7 +68,8 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
// by the Reader (for proper config propagation to Parquet components)
this.storage = storage.newInstance(path,
tryOverrideDefaultConfigs(storage.getConf().newInstance()));
this.path = path;
- this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
+ this.parquetUtils = HoodieIOFactory.getIOFactory(storage)
+ .getFileFormatUtils(HoodieFileFormat.PARQUET);
}
@Override
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHadoopIOFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHadoopIOFactory.java
index 4203fe90b4b..3b32d67a7f9 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHadoopIOFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHadoopIOFactory.java
@@ -20,7 +20,12 @@
package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.fs.ConsistencyGuard;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.HFileUtils;
+import org.apache.hudi.common.util.OrcUtils;
+import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -79,6 +84,20 @@ public class HoodieHadoopIOFactory extends HoodieIOFactory {
}
}
+ @Override
+ public FileFormatUtils getFileFormatUtils(HoodieFileFormat fileFormat) {
+ switch (fileFormat) {
+ case PARQUET:
+ return new ParquetUtils();
+ case ORC:
+ return new OrcUtils();
+ case HFILE:
+ return new HFileUtils();
+ default:
+ throw new UnsupportedOperationException(fileFormat.name() + " format
not supported yet.");
+ }
+ }
+
@Override
public HoodieStorage getStorage(StoragePath storagePath) {
return storage.newInstance(storagePath, storage.getConf());
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHadoopIOFactory.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHadoopIOFactory.java
new file mode 100644
index 00000000000..7aaf811e737
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHadoopIOFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hadoop;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.util.HFileUtils;
+import org.apache.hudi.common.util.OrcUtils;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.apache.hudi.storage.HoodieStorageUtils.DEFAULT_URI;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+/**
+ * Tests {@link HoodieHadoopIOFactory}
+ */
+public class TestHoodieHadoopIOFactory {
+ @Test
+ public void testGetFileFormatUtils() throws IOException {
+ try (HoodieStorage storage =
+ new HoodieHadoopStorage(HadoopFSUtils.getFs(DEFAULT_URI,
getDefaultStorageConf()))) {
+ HoodieIOFactory ioFactory = new HoodieHadoopIOFactory(storage);
+ assertTrue(ioFactory.getFileFormatUtils(new
StoragePath("file:///a/b.parquet")) instanceof ParquetUtils);
+ assertTrue(ioFactory.getFileFormatUtils(new
StoragePath("file:///a/b.orc")) instanceof OrcUtils);
+ assertTrue(ioFactory.getFileFormatUtils(new
StoragePath("file:///a/b.hfile")) instanceof HFileUtils);
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> ioFactory.getFileFormatUtils(new
StoragePath("file:///a/b.log")));
+
+ assertTrue(ioFactory.getFileFormatUtils(HoodieFileFormat.PARQUET)
instanceof ParquetUtils);
+ assertTrue(ioFactory.getFileFormatUtils(HoodieFileFormat.ORC) instanceof
OrcUtils);
+ assertTrue(ioFactory.getFileFormatUtils(HoodieFileFormat.HFILE)
instanceof HFileUtils);
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> ioFactory.getFileFormatUtils(HoodieFileFormat.HOODIE_LOG));
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index e534a13d766..246c266d467 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -23,9 +23,9 @@ import org.apache.hudi.common.bloom.{BloomFilter,
BloomFilterFactory}
import org.apache.hudi.common.config.HoodieStorageConfig
import
org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES,
BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.util.{FileFormatUtils, Option}
+import org.apache.hudi.common.util.Option
import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter
-import org.apache.hudi.io.storage.HoodieParquetConfig
+import org.apache.hudi.io.storage.{HoodieIOFactory, HoodieParquetConfig}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration,
StoragePath}
import org.apache.avro.Schema
@@ -48,7 +48,9 @@ object SparkHelpers {
sourceFile: StoragePath,
destinationFile: StoragePath,
keysToSkip: Set[String]) {
- val sourceRecords =
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(storage,
sourceFile).asScala
+ val sourceRecords = HoodieIOFactory.getIOFactory(storage)
+ .getFileFormatUtils(HoodieFileFormat.PARQUET)
+ .readAvroRecords(storage, sourceFile).asScala
val schema: Schema = sourceRecords.head.getSchema
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(
BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt,
BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
@@ -140,7 +142,9 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
* @return <pre>true</pre> if all keys are added to the bloom filter;
<pre>false</pre> otherwise.
*/
def fileKeysAgainstBF(storage: HoodieStorage, sqlContext: SQLContext, file:
String): Boolean = {
- val bf =
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(storage,
new StoragePath(file))
+ val bf = HoodieIOFactory.getIOFactory(storage)
+ .getFileFormatUtils(HoodieFileFormat.PARQUET)
+ .readBloomFilterFromMetadata(storage, new StoragePath(file))
val foundCount = sqlContext.parquetFile(file)
.select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
.collect().count(r => !bf.mightContain(r.getString(0)))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 6653c9cf969..357200f5f0e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -18,8 +18,6 @@
package org.apache.hudi;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -176,7 +174,7 @@ public class ColumnStatsIndexHelper {
colMinMaxInfos =
jsc.parallelize(baseFilesPaths, numParallelism)
.mapPartitions(paths -> {
- ParquetUtils utils = (ParquetUtils)
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
+ ParquetUtils utils = new ParquetUtils();
Iterable<String> iterable = () -> paths;
return StreamSupport.stream(iterable.spliterator(), false)
.flatMap(path -> {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index bfb9e18af1b..b291c2ccae3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -62,6 +62,7 @@ import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -1440,11 +1441,13 @@ public class HoodieMetadataTableValidator implements
Serializable {
.sorted(new HoodieColumnRangeMetadataComparator())
.collect(Collectors.toList());
} else {
+ FileFormatUtils formatUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ .getFileFormatUtils(HoodieFileFormat.PARQUET);
return baseFileNameList.stream().flatMap(filename ->
-
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata(
- metaClient.getStorage(),
- new
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(),
partitionPath), filename),
- allColumnNameList).stream())
+ formatUtils.readColumnStatsFromMetadata(
+ metaClient.getStorage(),
+ new
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(),
partitionPath), filename),
+ allColumnNameList).stream())
.sorted(new HoodieColumnRangeMetadataComparator())
.collect(Collectors.toList());
}