This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2cf7f8822134ce7e15acf1434b9d13075a15fd05 Author: Jon Vexler <[email protected]> AuthorDate: Wed May 15 06:50:00 2024 -0700 [HUDI-7744] Introduce IOFactory and a config to set the factory (#11192) Co-authored-by: Jonathan Vexler <=> Co-authored-by: Y Ethan Guo <[email protected]> --- .../org/apache/hudi/index/HoodieIndexUtils.java | 4 +- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 5 +- .../java/org/apache/hudi/io/HoodieReadHandle.java | 6 +- .../table/action/commit/HoodieMergeHelper.java | 9 +-- .../GenericRecordValidationTestUtils.java | 7 ++- .../run/strategy/JavaExecutionStrategy.java | 6 +- .../hudi/client/TestJavaHoodieBackedMetadata.java | 6 +- .../MultipleSparkJobExecutionStrategy.java | 8 +-- .../strategy/SingleSparkJobExecutionStrategy.java | 5 +- .../hudi/io/storage/HoodieSparkIOFactory.java | 49 ++++++++++++++++ .../bootstrap/ParquetBootstrapMetadataHandler.java | 4 +- .../functional/TestHoodieBackedMetadata.java | 10 ++-- .../functional/TestHoodieBackedTableMetadata.java | 4 +- .../hudi/common/config/HoodieStorageConfig.java | 8 +++ .../table/log/block/HoodieHFileDataBlock.java | 18 +++--- .../table/log/block/HoodieParquetDataBlock.java | 4 +- .../table/timeline/HoodieArchivedTimeline.java | 2 + .../hudi/io/storage/HoodieFileReaderFactory.java | 27 --------- .../hudi/io/storage/HoodieFileWriterFactory.java | 28 +-------- .../apache/hudi/io/storage/HoodieIOFactory.java | 51 ++++++++++++++++ .../hudi/metadata/HoodieBackedTableMetadata.java | 4 +- .../hudi/metadata/HoodieTableMetadataUtil.java | 14 ++--- .../hudi/sink/clustering/ClusteringOperator.java | 7 ++- .../org/apache/hudi/common/util/HFileUtils.java | 5 +- .../hudi/io/storage/HoodieHadoopIOFactory.java | 68 ++++++++++++++++++++++ .../io/hadoop/TestHoodieAvroFileReaderFactory.java | 8 ++- .../hudi/io/hadoop/TestHoodieOrcReaderWriter.java | 4 +- .../hudi/hadoop/HoodieHFileRecordReader.java | 8 ++- .../utils/HoodieRealtimeRecordReaderUtils.java | 8 ++- .../reader/DFSHoodieDatasetInputReader.java | 5 +- .../main/scala/org/apache/hudi/DefaultSource.scala | 6 +- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 4 +- .../utilities/HoodieMetadataTableValidator.java | 4 +- 33 files changed, 276 insertions(+), 130 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 808bfdfa863..db32112750a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -48,6 +48,7 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.io.HoodieMergedReadHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; import org.apache.hudi.storage.StorageConfiguration; @@ -185,7 +186,8 @@ public class HoodieIndexUtils { StorageConfiguration<?> configuration) throws HoodieIndexException { ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath)); List<String> foundRecordKeys = new ArrayList<>(); - try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(configuration) + .getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, configuration, filePath)) { // Load all rowKeys from the file, to double-confirm if (!candidateRecordKeys.isEmpty()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index ed18a2f0055..3c3a820ab09 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -44,9 +44,9 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -462,7 +462,8 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> } long oldNumWrites = 0; - try (HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(this.recordMerger.getRecordType()) + try (HoodieFileReader reader = HoodieIOFactory.getIOFactory(storage.getConf()) + .getReaderFactory(this.recordMerger.getRecordType()) .getFileReader(config, hoodieTable.getStorageConf(), oldFilePath)) { oldNumWrites = reader.getTotalRecords(); } catch (IOException e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java index 5f9afc1bad1..01678b68e96 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.table.HoodieTable; @@ -69,12 +69,12 @@ public abstract class HoodieReadHandle<T, I, K, O> extends HoodieIOHandle<T, I, } protected HoodieFileReader createNewFileReader() throws IOException { - return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()) + return HoodieIOFactory.getIOFactory(storage.getConf()).getReaderFactory(this.config.getRecordMerger().getRecordType()) .getFileReader(config, hoodieTable.getStorageConf(), getLatestBaseFile().getStoragePath()); } protected HoodieFileReader createNewFileReader(HoodieBaseFile hoodieBaseFile) throws IOException { - return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()) + return HoodieIOFactory.getIOFactory(storage.getConf()).getReaderFactory(this.config.getRecordMerger().getRecordType()) .getFileReader(config, hoodieTable.getStorageConf(), hoodieBaseFile.getStoragePath()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 38383fd7a88..a13253bc1b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -37,7 +37,7 @@ import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -80,7 +80,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper { StorageConfiguration<?> storageConf = table.getStorageConf().newInstance(); HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType(); - HoodieFileReader baseFileReader = HoodieFileReaderFactory + HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(storageConf) .getReaderFactory(recordType) .getFileReader(writeConfig, storageConf, mergeHandle.getOldFilePath()); HoodieFileReader bootstrapFileReader = null; @@ -112,9 +112,10 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper { if (baseFile.getBootstrapBaseFile().isPresent()) { StoragePath bootstrapFilePath = baseFile.getBootstrapBaseFile().get().getStoragePath(); StorageConfiguration<?> bootstrapFileConfig = table.getStorageConf().newInstance(); - bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( + bootstrapFileReader = HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(recordType).newBootstrapFileReader( baseFileReader, - HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(writeConfig, bootstrapFileConfig, bootstrapFilePath), + HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(recordType) + .getFileReader(writeConfig, bootstrapFileConfig, bootstrapFilePath), mergeHandle.getPartitionFields(), mergeHandle.getPartitionValues()); recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java index 4a342cbcec2..34972f01832 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java @@ -30,7 +30,7 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -145,9 +145,10 @@ public class GenericRecordValidationTestUtils { public static Stream<GenericRecord> readHFile(Configuration conf, String[] paths) { List<GenericRecord> valuesAsList = new LinkedList<>(); for (String path : paths) { + StorageConfiguration storageConf = HadoopFSUtils.getStorageConf(conf); try (HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase) - HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) - .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, HadoopFSUtils.getStorageConf(conf), new StoragePath(path), HoodieFileFormat.HFILE)) { + HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, new StoragePath(path), HoodieFileFormat.HFILE)) { valuesAsList.addAll(HoodieAvroHFileReaderImplBase.readAllRecords(reader) .stream().map(e -> (GenericRecord) e).collect(Collectors.toList())); } catch (IOException e) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 02021dcc405..5b216807932 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -43,7 +43,7 @@ import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFac import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; @@ -192,7 +192,7 @@ public abstract class JavaExecutionStrategy<T> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() - : Option.of(HoodieFileReaderFactory.getReaderFactory(recordType) + : Option.of(HoodieIOFactory.getIOFactory(table.getStorageConf()).getReaderFactory(recordType) .getFileReader(config, table.getStorageConf(), new StoragePath(clusteringOp.getDataFilePath()))); HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); Iterator<HoodieRecord<T>> fileSliceReader = new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), writeConfig.getRecordMerger(), @@ -221,7 +221,7 @@ public abstract class JavaExecutionStrategy<T> private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) { List<HoodieRecord<T>> records = new ArrayList<>(); clusteringOps.forEach(clusteringOp -> { - try (HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType) + try (HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(getHoodieTable().getStorageConf()).getReaderFactory(recordType) .getFileReader(getHoodieTable().getConfig(), getHoodieTable().getStorageConf(), new StoragePath(clusteringOp.getDataFilePath()))) { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); Iterator<HoodieRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 8e62d640530..c2413133477 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -85,7 +85,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; @@ -544,7 +544,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { List<FileSlice> fileSlices = table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList()); HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); HoodieAvroHFileReaderImplBase hoodieHFileReader = (HoodieAvroHFileReaderImplBase) - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader( + HoodieIOFactory.getIOFactory(context.getStorageConf()).getReaderFactory(HoodieRecordType.AVRO).getFileReader( writeConfig, context.getStorageConf(), new StoragePath(baseFile.getPath())); List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader); records.forEach(entry -> { @@ -971,7 +971,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); HoodieAvroHFileReaderImplBase hoodieHFileReader = (HoodieAvroHFileReaderImplBase) - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader( + HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO).getFileReader( table.getConfig(), context.getStorageConf(), new StoragePath(baseFile.getPath())); List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader); records.forEach(entry -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index ea1ae05e2b0..fe1e6710673 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -55,7 +55,6 @@ import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner; import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.storage.StorageConfiguration; @@ -93,6 +92,7 @@ import java.util.stream.Stream; import static org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals; import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; +import static org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory; /** * Clustering strategy to submit multiple spark jobs and union the results. @@ -380,7 +380,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T> private HoodieFileReader getBaseOrBootstrapFileReader(StorageConfiguration<?> storageConf, String bootstrapBasePath, Option<String[]> partitionFields, ClusteringOperation clusteringOp) throws IOException { - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType) + HoodieFileReader baseFileReader = getHoodieSparkIOFactory().getReaderFactory(recordType) .getFileReader(writeConfig, storageConf, new StoragePath(clusteringOp.getDataFilePath())); // handle bootstrap path if (StringUtils.nonEmpty(clusteringOp.getBootstrapFilePath()) && StringUtils.nonEmpty(bootstrapBasePath)) { @@ -392,9 +392,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T> partitionValues = getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), storageConf.unwrapAs(Configuration.class)); } - baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( + baseFileReader = getHoodieSparkIOFactory().getReaderFactory(recordType).newBootstrapFileReader( baseFileReader, - HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader( + getHoodieSparkIOFactory().getReaderFactory(recordType).getFileReader( writeConfig, storageConf, new StoragePath(bootstrapFilePath)), partitionFields, partitionValues); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index 50eb9d4bd7a..06ba64dad89 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -39,7 +39,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.storage.StoragePath; @@ -64,6 +63,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import static org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory; + /** * Clustering strategy to submit single spark jobs. * MultipleSparkJobExecution strategy is not ideal for use cases that require large number of clustering groups @@ -146,7 +147,7 @@ public abstract class SingleSparkJobExecutionStrategy<T> Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); Iterable<HoodieRecord<T>> indexedRecords = () -> { try { - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType) + HoodieFileReader baseFileReader = getHoodieSparkIOFactory().getReaderFactory(recordType) .getFileReader(writeConfig, getHoodieTable().getStorageConf(), new StoragePath(clusteringOp.getDataFilePath())); Option<BaseKeyGenerator> keyGeneratorOp = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig); // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java new file mode 100644 index 00000000000..16431d61551 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.model.HoodieRecord; + +/** + * Creates readers and writers for SPARK and AVRO record payloads + */ +public class HoodieSparkIOFactory extends HoodieHadoopIOFactory { + private static final HoodieSparkIOFactory HOODIE_SPARK_IO_FACTORY = new HoodieSparkIOFactory(); + + public static HoodieSparkIOFactory getHoodieSparkIOFactory() { + return HOODIE_SPARK_IO_FACTORY; + } + + @Override + public HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) { + if (recordType == HoodieRecord.HoodieRecordType.SPARK) { + return new HoodieSparkFileReaderFactory(); + } + return super.getReaderFactory(recordType); + } + + @Override + public HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) { + if (recordType == HoodieRecord.HoodieRecordType.SPARK) { + return new HoodieSparkFileWriterFactory(); + } + return super.getWriterFactory(recordType); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java index 151e88432e3..adc6a456ac9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java @@ -31,7 +31,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieBootstrapHandle; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.keygen.KeyGeneratorInterface; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -58,6 +57,7 @@ import java.io.IOException; import java.util.function.Function; import static org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA; +import static org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory; class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler { @@ -82,7 +82,7 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler { Schema schema) throws Exception { HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType(); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(recordType) + HoodieFileReader reader = getHoodieSparkIOFactory().getReaderFactory(recordType) .getFileReader(table.getConfig(), table.getStorageConf(), sourceFilePath); HoodieExecutor<Void> executor = null; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 9301529c740..a83fcd4bf27 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -88,7 +88,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; @@ -172,6 +171,7 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAM import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime; import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS; import static org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.METADATA_COMPACTION_TIME_SUFFIX; +import static org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable; @@ -821,7 +821,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { List<FileSlice> fileSlices = table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList()); HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); HoodieAvroHFileReaderImplBase hoodieHFileReader = (HoodieAvroHFileReaderImplBase) - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader( + getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO).getFileReader( table.getConfig(), context.getStorageConf(), new StoragePath(baseFile.getPath())); List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader); records.forEach(entry -> { @@ -1354,9 +1354,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); - HoodieAvroHFileReaderImplBase hoodieHFileReader = (HoodieAvroHFileReaderImplBase) - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader( - table.getConfig(), context.getStorageConf(), new StoragePath(baseFile.getPath())); + HoodieAvroHFileReaderImplBase hoodieHFileReader = (HoodieAvroHFileReaderImplBase) getHoodieSparkIOFactory() + .getReaderFactory(HoodieRecordType.AVRO) + .getFileReader(table.getConfig(), context.getStorageConf(), new StoragePath(baseFile.getPath())); List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader); records.forEach(entry -> { if (enableMetaFields) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 3310dda5633..c4a79f1ea71 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -46,7 +46,6 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; import org.apache.hudi.metadata.HoodieMetadataPayload; @@ -87,6 +86,7 @@ import static org.apache.hudi.common.model.WriteOperationType.COMPACT; import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION; +import static org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory; import static org.apache.hudi.metadata.MetadataPartitionType.FILES; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -527,7 +527,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); HoodieAvroHFileReaderImplBase hoodieHFileReader = (HoodieAvroHFileReaderImplBase) - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader( + getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO).getFileReader( table.getConfig(), context.getStorageConf(), new StoragePath(baseFile.getPath())); List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader); records.forEach(entry -> { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java index f3ad183def4..0309aee00a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java @@ -235,6 +235,14 @@ public class HoodieStorageConfig extends HoodieConfig { + "and it is loaded at runtime. This is only required when trying to " + "override the existing write context when `hoodie.datasource.write.row.writer.enable=true`."); + public static final ConfigProperty<String> HOODIE_IO_FACTORY_CLASS = ConfigProperty + .key("hoodie.io.factory.class") + .defaultValue("org.apache.hudi.io.storage.HoodieHadoopIOFactory") + .markAdvanced() + .sinceVersion("0.15.0") + .withDocumentation("The fully-qualified class name of the factory class to return readers and writers of files used " + + "by Hudi. The provided class should implement `org.apache.hudi.io.storage.HoodieIOFactory`."); + /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 219fa2dc1c7..f3b79e05787 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -33,7 +33,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; @@ -192,11 +192,10 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { StorageConfiguration<?> storageConf = getBlockContentLocation().get().getStorageConf().getInline(); HoodieStorage storage = HoodieStorageUtils.getStorage(pathForReader, storageConf); // Read the content - try (HoodieFileReader reader = - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getContentReader( - - hFileReaderConfig, storageConf, pathForReader, HoodieFileFormat.HFILE, storage, content, - Option.of(getSchemaFromHeader()))) { + try (HoodieFileReader reader = HoodieIOFactory.getIOFactory(storageConf) + .getReaderFactory(HoodieRecordType.AVRO) + .getContentReader(hFileReaderConfig, storageConf, pathForReader, + HoodieFileFormat.HFILE, storage, content, Option.of(getSchemaFromHeader()))) { return unsafeCast(reader.getRecordIterator(readerSchema)); } } @@ -216,10 +215,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { blockContentLoc.getContentPositionInLogFile(), blockContentLoc.getBlockSize()); - try (final HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase) - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader( - hFileReaderConfig, inlineConf, inlinePath, HoodieFileFormat.HFILE, - Option.of(getSchemaFromHeader()))) { + try (final HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase) HoodieIOFactory.getIOFactory(inlineConf) + .getReaderFactory(HoodieRecordType.AVRO) + .getFileReader(hFileReaderConfig, inlineConf, inlinePath, HoodieFileFormat.HFILE, Option.of(getSchemaFromHeader()))) { // Get writer's schema from the header final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator = fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index 28c025c9020..32f4f46a955 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -25,9 +25,9 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.io.SeekableDataInputStream; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.inline.InLineFSUtils; @@ -150,7 +150,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { Schema writerSchema = new Schema.Parser().parse(this.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - ClosableIterator<HoodieRecord<T>> iterator = HoodieFileReaderFactory.getReaderFactory(type) + ClosableIterator<HoodieRecord<T>> iterator = HoodieIOFactory.getIOFactory(inlineConf).getReaderFactory(type) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, inlineConf, inlineLogFilePath, PARQUET, Option.empty()) .getRecordIterator(writerSchema, readerSchema); return iterator; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 587fd31866e..8914fa5249b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -35,6 +35,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java index c285f04a2b2..8637c468fdd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java @@ -22,10 +22,7 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -43,30 +40,6 @@ import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; */ public class HoodieFileReaderFactory { - public static HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) { - switch (recordType) { - case AVRO: - - try { - Class<?> clazz = - ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory"); - return (HoodieFileReaderFactory) clazz.newInstance(); - } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) { - throw new HoodieException("Unable to create HoodieAvroFileReaderFactory", e); - } - case SPARK: - try { - Class<?> clazz = - ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory"); - return (HoodieFileReaderFactory) clazz.newInstance(); - } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) { - throw new HoodieException("Unable to create HoodieSparkFileReaderFactory", e); - } - default: - throw new UnsupportedOperationException(recordType + " record type not supported yet."); - } - } - public HoodieFileReader getFileReader(HoodieConfig hoodieConfig, StorageConfiguration<?> conf, StoragePath path) throws IOException { final String extension = FSUtils.getFileExtension(path.toString()); if (PARQUET.getFileExtension().equals(extension)) { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 1c588bce8af..c0e154ed6ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -25,10 +25,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; -import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -43,39 +40,18 @@ import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; public class HoodieFileWriterFactory { - private static HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) { - switch (recordType) { - case AVRO: - try { - Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory"); - return (HoodieFileWriterFactory) clazz.newInstance(); - } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) { - throw new HoodieException("Unable to create HoodieAvroFileWriterFactory", e); - } - case SPARK: - try { - Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory"); - return (HoodieFileWriterFactory) clazz.newInstance(); - } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) { - throw new HoodieException("Unable to create HoodieSparkFileWriterFactory", e); - } - default: - throw new UnsupportedOperationException(recordType + " record type not supported yet."); - } - } - public static <T, I, K, O> HoodieFileWriter getFileWriter( String instantTime, StoragePath path, StorageConfiguration<?> conf, HoodieConfig config, Schema schema, TaskContextSupplier taskContextSupplier, HoodieRecordType recordType) throws IOException { final String extension = FSUtils.getFileExtension(path.getName()); - HoodieFileWriterFactory factory = getWriterFactory(recordType); + HoodieFileWriterFactory factory = HoodieIOFactory.getIOFactory(conf).getWriterFactory(recordType); return factory.getFileWriterByFormat(extension, instantTime, path, conf, config, schema, taskContextSupplier); } public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat format, OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema, HoodieRecordType recordType) throws IOException { - HoodieFileWriterFactory factory = getWriterFactory(recordType); + HoodieFileWriterFactory factory = HoodieIOFactory.getIOFactory(conf).getWriterFactory(recordType); return factory.getFileWriterByFormat(format, outputStream, conf, config, schema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java new file mode 100644 index 00000000000..3e715366134 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.StorageConfiguration; + +/** + * Base class to get HoodieFileReaderFactory and HoodieFileWriterFactory + */ +public abstract class HoodieIOFactory { + + public static HoodieIOFactory getIOFactory(StorageConfiguration<?> storageConf) { + String ioFactoryClass = storageConf.getString(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.key()) + .orElse(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.defaultValue()); + return getIOFactory(ioFactoryClass); + } + + private static HoodieIOFactory getIOFactory(String ioFactoryClass) { + try { + return ReflectionUtils.loadClass(ioFactoryClass); + } catch (Exception e) { + throw new HoodieException("Unable to create " + ioFactoryClass, e); + } + } + + public abstract HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType); + + public abstract HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType); + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 68932a5224f..74079e8845a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -49,7 +49,7 @@ import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.expression.BindVisitor; import org.apache.hudi.expression.Expression; import org.apache.hudi.internal.schema.Types; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.io.storage.HoodieSeekingFileReader; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.util.Transient; @@ -446,7 +446,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { Option<HoodieBaseFile> basefile = slice.getBaseFile(); if (basefile.isPresent()) { StoragePath baseFilePath = basefile.get().getStoragePath(); - baseFileReader = (HoodieSeekingFileReader<?>) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + baseFileReader = (HoodieSeekingFileReader<?>) HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), baseFilePath); baseFileOpenMs = timer.endTimer(); LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath, diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index cc12c03676f..8c2ccf5f080 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -70,7 +70,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; @@ -504,9 +504,9 @@ public class HoodieTableMetadataUtil { } final StoragePath writeFilePath = new StoragePath(dataMetaClient.getBasePathV2(), pathWithPartition); - try (HoodieFileReader fileReader = - HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader( - hoodieConfig, dataMetaClient.getStorageConf(), writeFilePath)) { + try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(dataMetaClient.getStorageConf()) + .getReaderFactory(HoodieRecordType.AVRO).getFileReader(hoodieConfig, + dataMetaClient.getStorageConf(), writeFilePath)) { try { final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); if (fileBloomFilter == null) { @@ -926,7 +926,7 @@ public class HoodieTableMetadataUtil { private static ByteBuffer readBloomFilter(StorageConfiguration<?> conf, StoragePath filePath) throws IOException { HoodieConfig hoodieConfig = getReaderConfigs(conf); - try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(conf).getReaderFactory(HoodieRecordType.AVRO) .getFileReader(hoodieConfig, conf, filePath)) { final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); if (fileBloomFilter == null) { @@ -1781,7 +1781,7 @@ public class HoodieTableMetadataUtil { final String fileId = baseFile.getFileId(); final String instantTime = baseFile.getCommitTime(); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + HoodieFileReader reader = HoodieIOFactory.getIOFactory(configuration).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) .getFileReader(config, configuration, dataFilePath); return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, partition, fileId, instantTime); }); @@ -1842,7 +1842,7 @@ public class HoodieTableMetadataUtil { final String fileId = baseFile.getFileId(); final String instantTime = baseFile.getCommitTime(); HoodieConfig hoodieConfig = getReaderConfigs(storageConf); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + HoodieFileReader reader = HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) .getFileReader(hoodieConfig, storageConf, dataFilePath); return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, partition, fileId, instantTime); }); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 93a2f5d45d2..3709c27a8b8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -44,6 +44,7 @@ import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.metrics.FlinkClusteringMetrics; import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; @@ -273,7 +274,8 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven try { Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() - : Option.of(HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType()) + : Option.of(HoodieIOFactory.getIOFactory(table.getStorageConf()) + .getReaderFactory(table.getConfig().getRecordMerger().getRecordType()) .getFileReader(table.getConfig(), table.getStorageConf(), new StoragePath(clusteringOp.getDataFilePath()))); HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() .withStorage(table.getMetaClient().getStorage()) @@ -320,7 +322,8 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven List<Iterator<RowData>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> { Iterable<IndexedRecord> indexedRecords = () -> { try { - HoodieFileReaderFactory fileReaderFactory = HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType()); + HoodieFileReaderFactory fileReaderFactory = HoodieIOFactory.getIOFactory(table.getStorageConf()) + .getReaderFactory(table.getConfig().getRecordMerger().getRecordType()); HoodieAvroFileReader fileReader = (HoodieAvroFileReader) fileReaderFactory.getFileReader( table.getConfig(), table.getStorageConf(), new StoragePath(clusteringOp.getDataFilePath())); diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java index ad42c0e86fb..52c26477f47 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java @@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageConfiguration; @@ -100,7 +100,8 @@ public class HFileUtils extends BaseFileUtils { LOG.info("Reading schema from {}", filePath); try (HoodieFileReader fileReader = - HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + HoodieIOFactory.getIOFactory(configuration) + .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) .getFileReader( ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, configuration, diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java new file mode 100644 index 00000000000..65c8d028adb --- /dev/null +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory; +import org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory; + +/** + * Creates readers and writers for AVRO record payloads. + * Currently uses reflection to support SPARK record payloads but + * this ability should be removed with [HUDI-7746] + */ +public class HoodieHadoopIOFactory extends HoodieIOFactory { + + @Override + public HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) { + switch (recordType) { + case AVRO: + return new HoodieAvroFileReaderFactory(); + case SPARK: + //TODO: remove this case [HUDI-7746] + try { + return ReflectionUtils.loadClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory"); + } catch (Exception e) { + throw new HoodieException("Unable to create HoodieSparkFileReaderFactory", e); + } + default: + throw new UnsupportedOperationException(recordType + " record type not supported"); + } + } + + @Override + public HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) { + switch (recordType) { + case AVRO: + return new HoodieAvroFileWriterFactory(); + case SPARK: + //TODO: remove this case [HUDI-7746] + try { + return ReflectionUtils.loadClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory"); + } catch (Exception e) { + throw new HoodieException("Unable to create HoodieSparkFileWriterFactory", e); + } + default: + throw new UnsupportedOperationException(recordType + " record type not supported"); + } + } +} diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java index 7faf84a1ee5..85731674cd6 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -48,7 +49,7 @@ public class TestHoodieAvroFileReaderFactory { // parquet file format. final StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(new Configuration()); final StoragePath parquetPath = new StoragePath("/partition/path/f1_1-0-1_000.parquet"); - HoodieFileReader parquetReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + HoodieFileReader parquetReader = HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, parquetPath); assertTrue(parquetReader instanceof HoodieAvroParquetReader); @@ -56,14 +57,15 @@ public class TestHoodieAvroFileReaderFactory { final StoragePath logPath = new StoragePath( "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> { - HoodieFileReader logWriter = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + HoodieFileReader logWriter = HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, logPath); }, "should fail since log storage reader is not supported yet."); assertTrue(thrown.getMessage().contains("format not supported yet.")); // Orc file format. final StoragePath orcPath = new StoragePath("/partition/path/f1_1-0-1_000.orc"); - HoodieFileReader orcReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + HoodieFileReader orcReader = HoodieIOFactory.getIOFactory(storageConf) + .getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, orcPath); assertTrue(orcReader instanceof HoodieAvroOrcReader); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java index 6a94a32ed3c..0cf0ca9d445 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.io.storage.HoodieAvroFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -78,7 +78,7 @@ public class TestHoodieOrcReaderWriter extends TestHoodieReaderWriterBase { @Override protected HoodieAvroFileReader createReader( StorageConfiguration<?> conf) throws Exception { - return (HoodieAvroFileReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + return (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(conf).getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, conf, getFilePath()); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java index 97177ab260d..85e9fcac311 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java @@ -26,7 +26,8 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; @@ -56,8 +57,9 @@ public class HoodieHFileRecordReader implements RecordReader<NullWritable, Array public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException { FileSplit fileSplit = (FileSplit) split; StoragePath path = convertToStoragePath(fileSplit.getPath()); - HoodieConfig hoodieConfig = getReaderConfigs(HadoopFSUtils.getStorageConf(conf)); - reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf); + HoodieConfig hoodieConfig = getReaderConfigs(storageConf); + reader = HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), path, HoodieFileFormat.HFILE, Option.empty()); schema = reader.getSchema(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 666e51b81de..6d4b79c6896 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -25,7 +25,8 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalType; @@ -306,8 +307,9 @@ public class HoodieRealtimeRecordReaderUtils { } public static HoodieFileReader getBaseFileReader(Path path, JobConf conf) throws IOException { - HoodieConfig hoodieConfig = getReaderConfigs(HadoopFSUtils.getStorageConf(conf)); - return HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf); + HoodieConfig hoodieConfig = getReaderConfigs(storageConf); + return HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), convertToStoragePath(path)); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 0fcae011638..fd3cc287323 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -43,7 +43,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieAvroFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -274,7 +274,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { if (fileSlice.getBaseFile().isPresent()) { // Read the base files using the latest writer schema. Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); - HoodieAvroFileReader reader = TypeUtils.unsafeCast(HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + HoodieAvroFileReader reader = TypeUtils.unsafeCast(HoodieIOFactory.getIOFactory(metaClient.getStorageConf()) + .getReaderFactory(HoodieRecordType.AVRO) .getFileReader( DEFAULT_HUDI_CONFIG_FOR_READER, metaClient.getStorageConf(), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index c432707d4e2..3a942285f09 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER} import org.apache.hudi.cdc.CDCRelation import org.apache.hudi.common.HoodieSchemaNotFoundException -import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.model.WriteConcurrencyMode import org.apache.hudi.common.table.timeline.HoodieInstant @@ -32,6 +32,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.io.storage.HoodieSparkIOFactory import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath} import org.apache.hudi.util.PathUtils @@ -65,6 +66,9 @@ class DefaultSource extends RelationProvider // Enable "passPartitionByAsOptions" to support "write.partitionBy(...)" spark.conf.set("spark.sql.legacy.sources.write.passPartitionByAsOptions", "true") } + // Always use spark io factory + spark.sparkContext.hadoopConfiguration.set(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.key(), + classOf[HoodieSparkIOFactory].getName) // Revisit EMRFS incompatibilities, for now disable spark.sparkContext.hadoopConfiguration.set("fs.s3.metadata.cache.expiration.seconds", "0") } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index ee815188d8e..a6f661c9e46 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -43,7 +43,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} -import org.apache.hudi.io.storage.HoodieFileReaderFactory +import org.apache.hudi.io.storage.HoodieSparkIOFactory import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.avro.Schema @@ -758,7 +758,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { val hoodieConfig = new HoodieConfig() hoodieConfig.setValue(USE_NATIVE_HFILE_READER, options.getOrElse(USE_NATIVE_HFILE_READER.key(), USE_NATIVE_HFILE_READER.defaultValue().toString)) - val reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + val reader = (new HoodieSparkIOFactory).getReaderFactory(HoodieRecordType.AVRO) .getFileReader(hoodieConfig, storageConf, filePath, HFILE) val requiredRowSchema = requiredDataSchema.structTypeSchema diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 0ec37e4a8fa..7ceaddeeb12 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -62,7 +62,6 @@ import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.storage.HoodieStorage; @@ -108,6 +107,7 @@ import static org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_ import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; +import static org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory; import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath; /** @@ -1488,7 +1488,7 @@ public class HoodieMetadataTableValidator implements Serializable { HoodieConfig hoodieConfig = new HoodieConfig(); hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER, Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props, HoodieReaderConfig.USE_NATIVE_HFILE_READER))); - try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + try (HoodieFileReader fileReader = getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO) .getFileReader(hoodieConfig, metaClient.getStorageConf(), path)) { bloomFilter = fileReader.readBloomFilter(); if (bloomFilter == null) {
