This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ea4f14c2851 [HUDI-7744] Introduce IOFactory and a config to set the
factory (#11192)
ea4f14c2851 is described below
commit ea4f14c28515ea9a60e9fe74fe18ef0cb497a1b0
Author: Jon Vexler <[email protected]>
AuthorDate: Mon May 13 13:37:09 2024 -0700
[HUDI-7744] Introduce IOFactory and a config to set the factory (#11192)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/client/timeline/LSMTimelineWriter.java | 5 +-
.../org/apache/hudi/index/HoodieIndexUtils.java | 5 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 5 +-
.../java/org/apache/hudi/io/HoodieReadHandle.java | 6 +-
.../table/action/commit/HoodieMergeHelper.java | 9 +--
.../GenericRecordValidationTestUtils.java | 7 ++-
.../run/strategy/JavaExecutionStrategy.java | 6 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 6 +-
.../MultipleSparkJobExecutionStrategy.java | 8 +--
.../strategy/SingleSparkJobExecutionStrategy.java | 5 +-
.../hudi/io/storage/HoodieSparkIOFactory.java | 49 ++++++++++++++++
.../bootstrap/ParquetBootstrapMetadataHandler.java | 4 +-
.../functional/TestHoodieBackedMetadata.java | 10 ++--
.../functional/TestHoodieBackedTableMetadata.java | 4 +-
.../hudi/common/config/HoodieStorageConfig.java | 8 +++
.../table/log/block/HoodieHFileDataBlock.java | 20 +++----
.../table/log/block/HoodieParquetDataBlock.java | 4 +-
.../table/timeline/HoodieArchivedTimeline.java | 5 +-
.../hudi/io/storage/HoodieFileReaderFactory.java | 27 ---------
.../hudi/io/storage/HoodieFileWriterFactory.java | 28 +--------
.../apache/hudi/io/storage/HoodieIOFactory.java | 51 ++++++++++++++++
.../hudi/metadata/HoodieBackedTableMetadata.java | 4 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 14 ++---
.../testutils/reader/HoodieTestReaderContext.java | 4 +-
.../hudi/sink/clustering/ClusteringOperator.java | 7 ++-
.../org/apache/hudi/common/util/HFileUtils.java | 5 +-
.../hudi/io/storage/HoodieHadoopIOFactory.java | 68 ++++++++++++++++++++++
.../io/hadoop/TestHoodieAvroFileReaderFactory.java | 8 ++-
.../hudi/io/hadoop/TestHoodieOrcReaderWriter.java | 4 +-
.../hudi/hadoop/HoodieHFileRecordReader.java | 8 ++-
.../utils/HoodieRealtimeRecordReaderUtils.java | 8 ++-
.../reader/DFSHoodieDatasetInputReader.java | 5 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 6 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 4 +-
.../utilities/HoodieMetadataTableValidator.java | 4 +-
35 files changed, 283 insertions(+), 138 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
index b7ad9bd57d8..a720819ee88 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
@@ -39,9 +39,9 @@ import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
@@ -269,7 +269,8 @@ public class LSMTimelineWriter {
try (HoodieFileWriter writer = openWriter(new
StoragePath(metaClient.getArchivePath(), compactedFileName))) {
for (String fileName : candidateFiles) {
// Read the input source file
- try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader)
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader)
HoodieIOFactory.getIOFactory(metaClient.getStorageConf())
+ .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(config, metaClient.getStorageConf(), new
StoragePath(metaClient.getArchivePath(), fileName))) {
// Read the meta entry
try (ClosableIterator<IndexedRecord> iterator =
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(),
HoodieLSMTimelineInstant.getClassSchema())) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index afde21e64ca..49e6a49f5c0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -45,7 +45,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -183,7 +183,8 @@ public class HoodieIndexUtils {
StorageConfiguration<?> configuration) throws HoodieIndexException {
checkArgument(FSUtils.isBaseFile(filePath));
List<Pair<String, Long>> foundRecordKeys = new ArrayList<>();
- try (HoodieFileReader fileReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ try (HoodieFileReader fileReader =
HoodieIOFactory.getIOFactory(configuration)
+ .getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, configuration,
filePath)) {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 7eb0a5fb52f..f04a015e423 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -44,9 +44,9 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
@@ -470,7 +470,8 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
}
long oldNumWrites = 0;
- try (HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(this.recordMerger.getRecordType())
+ try (HoodieFileReader reader =
HoodieIOFactory.getIOFactory(storage.getConf())
+ .getReaderFactory(this.recordMerger.getRecordType())
.getFileReader(config, hoodieTable.getStorageConf(), oldFilePath)) {
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index 5f9afc1bad1..01678b68e96 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.table.HoodieTable;
@@ -69,12 +69,12 @@ public abstract class HoodieReadHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
}
protected HoodieFileReader createNewFileReader() throws IOException {
- return
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
+ return
HoodieIOFactory.getIOFactory(storage.getConf()).getReaderFactory(this.config.getRecordMerger().getRecordType())
.getFileReader(config, hoodieTable.getStorageConf(),
getLatestBaseFile().getStoragePath());
}
protected HoodieFileReader createNewFileReader(HoodieBaseFile
hoodieBaseFile) throws IOException {
- return
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
+ return
HoodieIOFactory.getIOFactory(storage.getConf()).getReaderFactory(this.config.getRecordMerger().getRecordType())
.getFileReader(config, hoodieTable.getStorageConf(),
hoodieBaseFile.getStoragePath());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 38383fd7a88..a13253bc1b0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -37,7 +37,7 @@ import
org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
@@ -80,7 +80,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
StorageConfiguration<?> storageConf = table.getStorageConf().newInstance();
HoodieRecord.HoodieRecordType recordType =
table.getConfig().getRecordMerger().getRecordType();
- HoodieFileReader baseFileReader = HoodieFileReaderFactory
+ HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(storageConf)
.getReaderFactory(recordType)
.getFileReader(writeConfig, storageConf, mergeHandle.getOldFilePath());
HoodieFileReader bootstrapFileReader = null;
@@ -112,9 +112,10 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
if (baseFile.getBootstrapBaseFile().isPresent()) {
StoragePath bootstrapFilePath =
baseFile.getBootstrapBaseFile().get().getStoragePath();
StorageConfiguration<?> bootstrapFileConfig =
table.getStorageConf().newInstance();
- bootstrapFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
+ bootstrapFileReader =
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(recordType).newBootstrapFileReader(
baseFileReader,
-
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(writeConfig,
bootstrapFileConfig, bootstrapFilePath),
+
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(recordType)
+ .getFileReader(writeConfig, bootstrapFileConfig,
bootstrapFilePath),
mergeHandle.getPartitionFields(),
mergeHandle.getPartitionValues());
recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index 9a51c6204b3..aba943d6d5a 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -30,7 +30,7 @@ import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -145,9 +145,10 @@ public class GenericRecordValidationTestUtils {
public static Stream<GenericRecord> readHFile(Configuration conf, String[]
paths) {
List<GenericRecord> valuesAsList = new LinkedList<>();
for (String path : paths) {
+ StorageConfiguration storageConf = HadoopFSUtils.getStorageConf(conf);
try (HoodieAvroHFileReaderImplBase reader =
(HoodieAvroHFileReaderImplBase)
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
- .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER,
HadoopFSUtils.getStorageConf(conf), new StoragePath(path),
HoodieFileFormat.HFILE)) {
+
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, new
StoragePath(path), HoodieFileFormat.HFILE)) {
valuesAsList.addAll(HoodieAvroHFileReaderImplBase.readAllRecords(reader)
.stream().map(e -> (GenericRecord)
e).collect(Collectors.toList()));
} catch (IOException e) {
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 02021dcc405..5b216807932 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -43,7 +43,7 @@ import
org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFac
import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
@@ -192,7 +192,7 @@ public abstract class JavaExecutionStrategy<T>
baseFileReader =
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
- : Option.of(HoodieFileReaderFactory.getReaderFactory(recordType)
+ :
Option.of(HoodieIOFactory.getIOFactory(table.getStorageConf()).getReaderFactory(recordType)
.getFileReader(config, table.getStorageConf(), new
StoragePath(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
Iterator<HoodieRecord<T>> fileSliceReader = new
HoodieFileSliceReader(baseFileReader, scanner, readerSchema,
tableConfig.getPreCombineField(), writeConfig.getRecordMerger(),
@@ -221,7 +221,7 @@ public abstract class JavaExecutionStrategy<T>
private List<HoodieRecord<T>>
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
- try (HoodieFileReader baseFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType)
+ try (HoodieFileReader baseFileReader =
HoodieIOFactory.getIOFactory(getHoodieTable().getStorageConf()).getReaderFactory(recordType)
.getFileReader(getHoodieTable().getConfig(),
getHoodieTable().getStorageConf(), new
StoragePath(clusteringOp.getDataFilePath()))) {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(getWriteConfig().getSchema()));
Iterator<HoodieRecord> recordIterator =
baseFileReader.getRecordIterator(readerSchema);
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 2a5b6e33171..91d9566fa60 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -86,7 +86,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
@@ -549,7 +549,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<FileSlice> fileSlices =
table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieAvroHFileReaderImplBase hoodieHFileReader =
(HoodieAvroHFileReaderImplBase)
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+
HoodieIOFactory.getIOFactory(context.getStorageConf()).getReaderFactory(HoodieRecordType.AVRO).getFileReader(
writeConfig, context.getStorageConf(), new
StoragePath(baseFile.getPath()));
List<IndexedRecord> records =
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
@@ -983,7 +983,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieAvroHFileReaderImplBase hoodieHFileReader =
(HoodieAvroHFileReaderImplBase)
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO).getFileReader(
table.getConfig(), context.getStorageConf(), new
StoragePath(baseFile.getPath()));
List<IndexedRecord> records =
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 739abe319b9..2331db1ff42 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -56,7 +56,6 @@ import
org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.StorageConfiguration;
@@ -94,6 +93,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals;
import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
/**
* Clustering strategy to submit multiple spark jobs and union the results.
@@ -385,7 +385,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
private HoodieFileReader
getBaseOrBootstrapFileReader(StorageConfiguration<?> storageConf, String
bootstrapBasePath, Option<String[]> partitionFields, ClusteringOperation
clusteringOp)
throws IOException {
- HoodieFileReader baseFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType)
+ HoodieFileReader baseFileReader =
getHoodieSparkIOFactory().getReaderFactory(recordType)
.getFileReader(writeConfig, storageConf, new
StoragePath(clusteringOp.getDataFilePath()));
// handle bootstrap path
if (StringUtils.nonEmpty(clusteringOp.getBootstrapFilePath()) &&
StringUtils.nonEmpty(bootstrapBasePath)) {
@@ -397,9 +397,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
partitionValues = getPartitionFieldVals(partitionFields,
partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(),
storageConf.unwrapAs(Configuration.class));
}
- baseFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
+ baseFileReader =
getHoodieSparkIOFactory().getReaderFactory(recordType).newBootstrapFileReader(
baseFileReader,
- HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(
+ getHoodieSparkIOFactory().getReaderFactory(recordType).getFileReader(
writeConfig, storageConf, new StoragePath(bootstrapFilePath)),
partitionFields,
partitionValues);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 50eb9d4bd7a..06ba64dad89 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -39,7 +39,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.StoragePath;
@@ -64,6 +63,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
+
/**
* Clustering strategy to submit single spark jobs.
* MultipleSparkJobExecution strategy is not ideal for use cases that require
large number of clustering groups
@@ -146,7 +147,7 @@ public abstract class SingleSparkJobExecutionStrategy<T>
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(getWriteConfig().getSchema()));
Iterable<HoodieRecord<T>> indexedRecords = () -> {
try {
- HoodieFileReader baseFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType)
+ HoodieFileReader baseFileReader =
getHoodieSparkIOFactory().getReaderFactory(recordType)
.getFileReader(writeConfig, getHoodieTable().getStorageConf(),
new StoragePath(clusteringOp.getDataFilePath()));
Option<BaseKeyGenerator> keyGeneratorOp =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
new file mode 100644
index 00000000000..16431d61551
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+/**
+ * Creates readers and writers for SPARK and AVRO record payloads
+ */
+public class HoodieSparkIOFactory extends HoodieHadoopIOFactory {
+ private static final HoodieSparkIOFactory HOODIE_SPARK_IO_FACTORY = new
HoodieSparkIOFactory();
+
+ public static HoodieSparkIOFactory getHoodieSparkIOFactory() {
+ return HOODIE_SPARK_IO_FACTORY;
+ }
+
+ @Override
+ public HoodieFileReaderFactory
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
+ if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
+ return new HoodieSparkFileReaderFactory();
+ }
+ return super.getReaderFactory(recordType);
+ }
+
+ @Override
+ public HoodieFileWriterFactory
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
+ if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
+ return new HoodieSparkFileWriterFactory();
+ }
+ return super.getWriterFactory(recordType);
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 151e88432e3..adc6a456ac9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -31,7 +31,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
@@ -58,6 +57,7 @@ import java.io.IOException;
import java.util.function.Function;
import static
org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA;
+import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
@@ -82,7 +82,7 @@ class ParquetBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
Schema schema) throws Exception {
HoodieRecord.HoodieRecordType recordType =
table.getConfig().getRecordMerger().getRecordType();
- HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(recordType)
+ HoodieFileReader reader =
getHoodieSparkIOFactory().getReaderFactory(recordType)
.getFileReader(table.getConfig(), table.getStorageConf(),
sourceFilePath);
HoodieExecutor<Void> executor = null;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 2c145f5b10e..7c84efe6bf6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -90,7 +90,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
@@ -172,6 +171,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTI
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime;
import static
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
+import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
@@ -826,7 +826,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<FileSlice> fileSlices =
table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieAvroHFileReaderImplBase hoodieHFileReader =
(HoodieAvroHFileReaderImplBase)
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO).getFileReader(
table.getConfig(), context.getStorageConf(), new
StoragePath(baseFile.getPath()));
List<IndexedRecord> records =
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
@@ -1449,9 +1449,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
- HoodieAvroHFileReaderImplBase hoodieHFileReader =
(HoodieAvroHFileReaderImplBase)
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
- table.getConfig(), context.getStorageConf(), new
StoragePath(baseFile.getPath()));
+ HoodieAvroHFileReaderImplBase hoodieHFileReader =
(HoodieAvroHFileReaderImplBase) getHoodieSparkIOFactory()
+ .getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(table.getConfig(), context.getStorageConf(), new
StoragePath(baseFile.getPath()));
List<IndexedRecord> records =
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
if (enableMetaFields) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 9e8521d669b..f711636d514 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -46,7 +46,6 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
@@ -87,6 +86,7 @@ import static
org.apache.hudi.common.model.WriteOperationType.COMPACT;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -527,7 +527,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieAvroHFileReaderImplBase hoodieHFileReader =
(HoodieAvroHFileReaderImplBase)
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO).getFileReader(
table.getConfig(), context.getStorageConf(), new
StoragePath(baseFile.getPath()));
List<IndexedRecord> records =
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index f3ad183def4..0309aee00a9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -235,6 +235,14 @@ public class HoodieStorageConfig extends HoodieConfig {
+ "and it is loaded at runtime. This is only required when trying to
"
+ "override the existing write context when
`hoodie.datasource.write.row.writer.enable=true`.");
+ public static final ConfigProperty<String> HOODIE_IO_FACTORY_CLASS =
ConfigProperty
+ .key("hoodie.io.factory.class")
+ .defaultValue("org.apache.hudi.io.storage.HoodieHadoopIOFactory")
+ .markAdvanced()
+ .sinceVersion("0.15.0")
+ .withDocumentation("The fully-qualified class name of the factory class
to return readers and writers of files used "
+ + "by Hudi. The provided class should implement
`org.apache.hudi.io.storage.HoodieIOFactory`.");
+
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index e63a1f9872a..3d6e2a81b0b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -33,7 +33,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
@@ -192,11 +192,10 @@ public class HoodieHFileDataBlock extends HoodieDataBlock
{
StorageConfiguration<?> storageConf =
getBlockContentLocation().get().getStorageConf().getInline();
HoodieStorage storage = HoodieStorageUtils.getStorage(pathForReader,
storageConf);
// Read the content
- try (HoodieFileReader reader =
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getContentReader(
-
- hFileReaderConfig, storageConf, pathForReader,
HoodieFileFormat.HFILE, storage, content,
- Option.of(getSchemaFromHeader()))) {
+ try (HoodieFileReader reader = HoodieIOFactory.getIOFactory(storageConf)
+ .getReaderFactory(HoodieRecordType.AVRO)
+ .getContentReader(hFileReaderConfig, storageConf, pathForReader,
+ HoodieFileFormat.HFILE, storage, content,
Option.of(getSchemaFromHeader()))) {
return unsafeCast(reader.getRecordIterator(readerSchema));
}
}
@@ -209,7 +208,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
HoodieStorage storage = HoodieStorageUtils.getStorage(pathForReader,
storageConf);
// Read the content
try (HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase)
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getContentReader(
+
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO).getContentReader(
hFileReaderConfig, storageConf, pathForReader,
HoodieFileFormat.HFILE, storage, content,
Option.of(getSchemaFromHeader()))) {
return unsafeCast(reader.getIndexedRecordIterator(readerSchema,
readerSchema));
@@ -231,10 +230,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
blockContentLoc.getContentPositionInLogFile(),
blockContentLoc.getBlockSize());
- try (final HoodieAvroHFileReaderImplBase reader =
(HoodieAvroHFileReaderImplBase)
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
- hFileReaderConfig, inlineConf, inlinePath, HoodieFileFormat.HFILE,
- Option.of(getSchemaFromHeader()))) {
+ try (final HoodieAvroHFileReaderImplBase reader =
(HoodieAvroHFileReaderImplBase) HoodieIOFactory.getIOFactory(inlineConf)
+ .getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(hFileReaderConfig, inlineConf, inlinePath,
HoodieFileFormat.HFILE, Option.of(getSchemaFromHeader()))) {
// Get writer's schema from the header
final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema)
: reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 2997390dc34..c29b396b6aa 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -26,9 +26,9 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.io.SeekableDataInputStream;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.inline.InLineFSUtils;
@@ -145,7 +145,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock
{
Schema writerSchema = new
Schema.Parser().parse(this.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- ClosableIterator<HoodieRecord<T>> iterator =
HoodieFileReaderFactory.getReaderFactory(type)
+ ClosableIterator<HoodieRecord<T>> iterator =
HoodieIOFactory.getIOFactory(inlineConf).getReaderFactory(type)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, inlineConf,
inlineLogFilePath, PARQUET, Option.empty())
.getRecordIterator(writerSchema, readerSchema);
return iterator;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 42f8a6a2753..28767c1047a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
@@ -266,7 +266,8 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
.filter(fileName -> filter == null ||
LSMTimeline.isFileInRange(filter, fileName))
.parallel().forEach(fileName -> {
// Read the archived file
- try (HoodieAvroFileReader reader = (HoodieAvroFileReader)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ try (HoodieAvroFileReader reader = (HoodieAvroFileReader)
HoodieIOFactory.getIOFactory(metaClient.getStorageConf())
+ .getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER,
metaClient.getStorageConf(), new StoragePath(metaClient.getArchivePath(),
fileName))) {
try (ClosableIterator<IndexedRecord> iterator =
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(),
readSchema)) {
while (iterator.hasNext()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index c285f04a2b2..8637c468fdd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -22,10 +22,7 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -43,30 +40,6 @@ import static
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
*/
public class HoodieFileReaderFactory {
- public static HoodieFileReaderFactory
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
- switch (recordType) {
- case AVRO:
-
- try {
- Class<?> clazz =
-
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory");
- return (HoodieFileReaderFactory) clazz.newInstance();
- } catch (IllegalArgumentException | IllegalAccessException |
InstantiationException e) {
- throw new HoodieException("Unable to create
HoodieAvroFileReaderFactory", e);
- }
- case SPARK:
- try {
- Class<?> clazz =
-
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
- return (HoodieFileReaderFactory) clazz.newInstance();
- } catch (IllegalArgumentException | IllegalAccessException |
InstantiationException e) {
- throw new HoodieException("Unable to create
HoodieSparkFileReaderFactory", e);
- }
- default:
- throw new UnsupportedOperationException(recordType + " record type not
supported yet.");
- }
- }
-
public HoodieFileReader getFileReader(HoodieConfig hoodieConfig,
StorageConfiguration<?> conf, StoragePath path) throws IOException {
final String extension = FSUtils.getFileExtension(path.toString());
if (PARQUET.getFileExtension().equals(extension)) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 69a8924f508..2218af0d426 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -25,10 +25,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -43,39 +40,18 @@ import static
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
public class HoodieFileWriterFactory {
- private static HoodieFileWriterFactory
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
- switch (recordType) {
- case AVRO:
- try {
- Class<?> clazz =
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory");
- return (HoodieFileWriterFactory) clazz.newInstance();
- } catch (IllegalAccessException | IllegalArgumentException |
InstantiationException e) {
- throw new HoodieException("Unable to create
HoodieAvroFileWriterFactory", e);
- }
- case SPARK:
- try {
- Class<?> clazz =
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
- return (HoodieFileWriterFactory) clazz.newInstance();
- } catch (IllegalAccessException | IllegalArgumentException |
InstantiationException e) {
- throw new HoodieException("Unable to create
HoodieSparkFileWriterFactory", e);
- }
- default:
- throw new UnsupportedOperationException(recordType + " record type not
supported yet.");
- }
- }
-
public static <T, I, K, O> HoodieFileWriter getFileWriter(
String instantTime, StoragePath path, StorageConfiguration<?> conf,
HoodieConfig config, Schema schema,
TaskContextSupplier taskContextSupplier, HoodieRecordType recordType)
throws IOException {
final String extension = FSUtils.getFileExtension(path.getName());
- HoodieFileWriterFactory factory = getWriterFactory(recordType);
+ HoodieFileWriterFactory factory =
HoodieIOFactory.getIOFactory(conf).getWriterFactory(recordType);
return factory.getFileWriterByFormat(extension, instantTime, path, conf,
config, schema, taskContextSupplier);
}
public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat
format, OutputStream outputStream,
StorageConfiguration<?> conf, HoodieConfig config, Schema schema,
HoodieRecordType recordType)
throws IOException {
- HoodieFileWriterFactory factory = getWriterFactory(recordType);
+ HoodieFileWriterFactory factory =
HoodieIOFactory.getIOFactory(conf).getWriterFactory(recordType);
return factory.getFileWriterByFormat(format, outputStream, conf, config,
schema);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
new file mode 100644
index 00000000000..3e715366134
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+/**
+ * Base class to get HoodieFileReaderFactory and HoodieFileWriterFactory
+ */
+public abstract class HoodieIOFactory {
+
+ public static HoodieIOFactory getIOFactory(StorageConfiguration<?>
storageConf) {
+ String ioFactoryClass =
storageConf.getString(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.key())
+ .orElse(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.defaultValue());
+ return getIOFactory(ioFactoryClass);
+ }
+
+ private static HoodieIOFactory getIOFactory(String ioFactoryClass) {
+ try {
+ return ReflectionUtils.loadClass(ioFactoryClass);
+ } catch (Exception e) {
+ throw new HoodieException("Unable to create " + ioFactoryClass, e);
+ }
+ }
+
+ public abstract HoodieFileReaderFactory
getReaderFactory(HoodieRecord.HoodieRecordType recordType);
+
+ public abstract HoodieFileWriterFactory
getWriterFactory(HoodieRecord.HoodieRecordType recordType);
+
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 9a525a8142c..c4510a3edac 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -50,7 +50,7 @@ import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.expression.BindVisitor;
import org.apache.hudi.expression.Expression;
import org.apache.hudi.internal.schema.Types;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.io.storage.HoodieSeekingFileReader;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.Transient;
@@ -606,7 +606,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
Option<HoodieBaseFile> basefile = slice.getBaseFile();
if (basefile.isPresent()) {
StoragePath baseFilePath = basefile.get().getStoragePath();
- baseFileReader = (HoodieSeekingFileReader<?>)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ baseFileReader = (HoodieSeekingFileReader<?>)
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(),
baseFilePath);
baseFileOpenMs = timer.endTimer();
LOG.info(String.format("Opened metadata base file from %s at instant %s
in %d ms", baseFilePath,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 3484fe8ae57..12dc1b33ecb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -82,7 +82,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
@@ -516,9 +516,9 @@ public class HoodieTableMetadataUtil {
}
final StoragePath writeFilePath = new
StoragePath(dataMetaClient.getBasePathV2(), pathWithPartition);
- try (HoodieFileReader fileReader =
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
- hoodieConfig, dataMetaClient.getStorageConf(),
writeFilePath)) {
+ try (HoodieFileReader fileReader =
HoodieIOFactory.getIOFactory(dataMetaClient.getStorageConf())
+ .getReaderFactory(HoodieRecordType.AVRO).getFileReader(hoodieConfig,
+ dataMetaClient.getStorageConf(), writeFilePath)) {
try {
final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
if (fileBloomFilter == null) {
@@ -961,7 +961,7 @@ public class HoodieTableMetadataUtil {
private static ByteBuffer readBloomFilter(StorageConfiguration<?> conf,
StoragePath filePath) throws IOException {
HoodieConfig hoodieConfig = getReaderConfigs(conf);
- try (HoodieFileReader fileReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ try (HoodieFileReader fileReader =
HoodieIOFactory.getIOFactory(conf).getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(hoodieConfig, conf, filePath)) {
final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
if (fileBloomFilter == null) {
@@ -1764,7 +1764,7 @@ public class HoodieTableMetadataUtil {
final String fileId = baseFile.getFileId();
final String instantTime = baseFile.getCommitTime();
- HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ HoodieFileReader reader =
HoodieIOFactory.getIOFactory(configuration).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(config, configuration, dataFilePath);
return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete,
partition, fileId, instantTime);
});
@@ -1825,7 +1825,7 @@ public class HoodieTableMetadataUtil {
final String fileId = baseFile.getFileId();
final String instantTime = baseFile.getCommitTime();
HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
- HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ HoodieFileReader reader =
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(hoodieConfig, storageConf, dataFilePath);
return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete,
partition, fileId, instantTime);
});
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index 6eb6733b04b..0266c036cf5 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -35,7 +35,7 @@ import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
@@ -81,7 +81,7 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
Schema requiredSchema,
StorageConfiguration<?> conf
) throws IOException {
- HoodieAvroFileReader reader = (HoodieAvroFileReader)
HoodieFileReaderFactory
+ HoodieAvroFileReader reader = (HoodieAvroFileReader)
HoodieIOFactory.getIOFactory(conf)
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new
HoodieConfig(),
conf, filePath, HoodieFileFormat.PARQUET, Option.empty());
return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 93a2f5d45d2..3709c27a8b8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -44,6 +44,7 @@ import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metrics.FlinkClusteringMetrics;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -273,7 +274,8 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
try {
Option<HoodieFileReader> baseFileReader =
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
- :
Option.of(HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
+ : Option.of(HoodieIOFactory.getIOFactory(table.getStorageConf())
+
.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
.getFileReader(table.getConfig(), table.getStorageConf(), new
StoragePath(clusteringOp.getDataFilePath())));
HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
.withStorage(table.getMetaClient().getStorage())
@@ -320,7 +322,8 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
List<Iterator<RowData>> iteratorsForPartition =
clusteringOps.stream().map(clusteringOp -> {
Iterable<IndexedRecord> indexedRecords = () -> {
try {
- HoodieFileReaderFactory fileReaderFactory =
HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType());
+ HoodieFileReaderFactory fileReaderFactory =
HoodieIOFactory.getIOFactory(table.getStorageConf())
+
.getReaderFactory(table.getConfig().getRecordMerger().getRecordType());
HoodieAvroFileReader fileReader = (HoodieAvroFileReader)
fileReaderFactory.getFileReader(
table.getConfig(), table.getStorageConf(), new
StoragePath(clusteringOp.getDataFilePath()));
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 48f7e41e047..f8177a869b1 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -27,7 +27,7 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
@@ -101,7 +101,8 @@ public class HFileUtils extends BaseFileUtils {
LOG.info("Reading schema from {}", filePath);
try (HoodieFileReader fileReader =
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ HoodieIOFactory.getIOFactory(configuration)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(
ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
configuration,
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
new file mode 100644
index 00000000000..65c8d028adb
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory;
+import org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory;
+
+/**
+ * Creates readers and writers for AVRO record payloads.
+ * Currently uses reflection to support SPARK record payloads but
+ * this ability should be removed with [HUDI-7746]
+ */
+public class HoodieHadoopIOFactory extends HoodieIOFactory {
+
+ @Override
+ public HoodieFileReaderFactory
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
+ switch (recordType) {
+ case AVRO:
+ return new HoodieAvroFileReaderFactory();
+ case SPARK:
+ //TODO: remove this case [HUDI-7746]
+ try {
+ return
ReflectionUtils.loadClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
+ } catch (Exception e) {
+ throw new HoodieException("Unable to create
HoodieSparkFileReaderFactory", e);
+ }
+ default:
+ throw new UnsupportedOperationException(recordType + " record type not
supported");
+ }
+ }
+
+ @Override
+ public HoodieFileWriterFactory
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
+ switch (recordType) {
+ case AVRO:
+ return new HoodieAvroFileWriterFactory();
+ case SPARK:
+ //TODO: remove this case [HUDI-7746]
+ try {
+ return
ReflectionUtils.loadClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
+ } catch (Exception e) {
+ throw new HoodieException("Unable to create
HoodieSparkFileWriterFactory", e);
+ }
+ default:
+ throw new UnsupportedOperationException(recordType + " record type not
supported");
+ }
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
index 7faf84a1ee5..85731674cd6 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
@@ -23,6 +23,7 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -48,7 +49,7 @@ public class TestHoodieAvroFileReaderFactory {
// parquet file format.
final StorageConfiguration<?> storageConf =
HadoopFSUtils.getStorageConf(new Configuration());
final StoragePath parquetPath = new
StoragePath("/partition/path/f1_1-0-1_000.parquet");
- HoodieFileReader parquetReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ HoodieFileReader parquetReader =
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf,
parquetPath);
assertTrue(parquetReader instanceof HoodieAvroParquetReader);
@@ -56,14 +57,15 @@ public class TestHoodieAvroFileReaderFactory {
final StoragePath logPath = new StoragePath(
"/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
final Throwable thrown = assertThrows(UnsupportedOperationException.class,
() -> {
- HoodieFileReader logWriter =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ HoodieFileReader logWriter =
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, logPath);
}, "should fail since log storage reader is not supported yet.");
assertTrue(thrown.getMessage().contains("format not supported yet."));
// Orc file format.
final StoragePath orcPath = new
StoragePath("/partition/path/f1_1-0-1_000.orc");
- HoodieFileReader orcReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ HoodieFileReader orcReader = HoodieIOFactory.getIOFactory(storageConf)
+ .getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, orcPath);
assertTrue(orcReader instanceof HoodieAvroOrcReader);
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
index 6a94a32ed3c..0cf0ca9d445 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -78,7 +78,7 @@ public class TestHoodieOrcReaderWriter extends
TestHoodieReaderWriterBase {
@Override
protected HoodieAvroFileReader createReader(
StorageConfiguration<?> conf) throws Exception {
- return (HoodieAvroFileReader)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ return (HoodieAvroFileReader)
HoodieIOFactory.getIOFactory(conf).getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, conf, getFilePath());
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
index 97177ab260d..85e9fcac311 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -26,7 +26,8 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
@@ -56,8 +57,9 @@ public class HoodieHFileRecordReader implements
RecordReader<NullWritable, Array
public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf
job) throws IOException {
FileSplit fileSplit = (FileSplit) split;
StoragePath path = convertToStoragePath(fileSplit.getPath());
- HoodieConfig hoodieConfig =
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
- reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+ HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+ reader =
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), path,
HoodieFileFormat.HFILE, Option.empty());
schema = reader.getSchema();
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index a66f3264e33..a2951973755 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -26,7 +26,8 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
@@ -309,8 +310,9 @@ public class HoodieRealtimeRecordReaderUtils {
}
public static HoodieFileReader getBaseFileReader(Path path, JobConf conf)
throws IOException {
- HoodieConfig hoodieConfig =
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
- return
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+ HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+ return
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf),
convertToStoragePath(path));
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index aa2e277edc9..59e04972692 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -43,7 +43,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -274,7 +274,8 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
if (fileSlice.getBaseFile().isPresent()) {
// Read the base files using the latest writer schema.
Schema schema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(schemaStr));
- HoodieAvroFileReader reader =
TypeUtils.unsafeCast(HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ HoodieAvroFileReader reader =
TypeUtils.unsafeCast(HoodieIOFactory.getIOFactory(metaClient.getStorageConf())
+ .getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(
DEFAULT_HUDI_CONFIG_FOR_READER,
metaClient.getStorageConf(),
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index c546e662b19..97a6016f873 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.config.HoodieReaderConfig
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.model.WriteConcurrencyMode
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
@@ -31,6 +31,7 @@ import
org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.io.storage.HoodieSparkIOFactory
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
import org.apache.hudi.util.PathUtils
@@ -64,6 +65,9 @@ class DefaultSource extends RelationProvider
// Enable "passPartitionByAsOptions" to support "write.partitionBy(...)"
spark.conf.set("spark.sql.legacy.sources.write.passPartitionByAsOptions",
"true")
}
+ // Always use spark io factory
+
spark.sparkContext.hadoopConfiguration.set(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.key(),
+ classOf[HoodieSparkIOFactory].getName)
// Revisit EMRFS incompatibilities, for now disable
spark.sparkContext.hadoopConfiguration.set("fs.s3.metadata.cache.expiration.seconds",
"0")
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 10685b624bc..ff70ed7a14c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -43,7 +43,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
-import org.apache.hudi.io.storage.HoodieFileReaderFactory
+import org.apache.hudi.io.storage.HoodieSparkIOFactory
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import org.apache.avro.Schema
@@ -854,7 +854,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
val hoodieConfig = new HoodieConfig()
hoodieConfig.setValue(USE_NATIVE_HFILE_READER,
options.getOrElse(USE_NATIVE_HFILE_READER.key(),
USE_NATIVE_HFILE_READER.defaultValue().toString))
- val reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ val reader = (new
HoodieSparkIOFactory).getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(hoodieConfig, storageConf, filePath, HFILE)
val requiredRowSchema = requiredDataSchema.structTypeSchema
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 9f792daabef..283b6167fc4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -62,7 +62,6 @@ import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.HoodieStorage;
@@ -108,6 +107,7 @@ import static
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_
import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
/**
@@ -1487,7 +1487,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
HoodieConfig hoodieConfig = new HoodieConfig();
hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER,
Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props,
HoodieReaderConfig.USE_NATIVE_HFILE_READER)));
- try (HoodieFileReader fileReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ try (HoodieFileReader fileReader =
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(hoodieConfig, metaClient.getStorageConf(), path)) {
bloomFilter = fileReader.readBloomFilter();
if (bloomFilter == null) {