This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9e35710f3bea311a8f849179e21fd86bbe4aa4f4 Author: Jon Vexler <[email protected]> AuthorDate: Wed May 15 06:09:15 2024 -0700 [HUDI-7350] Make Hudi reader and writer factory APIs Hadoop-independent (#11163) Abstract io reader and writer to de-hadoop --------- Co-authored-by: Jonathan Vexler <=> --- .../hudi/avro/TestHoodieAvroParquetWriter.java | 4 +- .../hudi/testutils/HoodieWriteableTestTable.java | 10 ++-- .../row/HoodieRowDataFileWriterFactory.java | 3 +- .../io/storage/row/HoodieRowDataParquetWriter.java | 2 +- .../io/storage/HoodieSparkFileWriterFactory.java | 5 +- .../hudi/io/storage/HoodieSparkParquetWriter.java | 1 + .../row/HoodieInternalRowFileWriterFactory.java | 3 +- .../row/HoodieInternalRowParquetWriter.java | 2 +- .../io/storage/row/HoodieRowParquetConfig.java | 8 +++- .../storage/TestHoodieAvroFileWriterFactory.java | 3 ++ .../hudi/common/table/TableSchemaResolver.java | 6 +-- .../table/log/block/HoodieParquetDataBlock.java | 54 +++++++++------------- .../hudi/io/storage/HoodieAvroFileReader.java | 28 +++++++++-- .../hudi/io/storage/HoodieAvroFileReaderBase.java | 48 ------------------- .../io/storage/HoodieAvroHFileReaderImplBase.java | 4 +- .../hudi/io/storage/HoodieFileReaderFactory.java | 11 ++++- .../hudi/io/storage/HoodieFileWriterFactory.java | 21 +++++---- .../io/storage/HoodieHBaseAvroHFileReader.java | 2 +- .../apache/hudi/io/storage/HoodieOrcConfig.java | 15 +++--- .../hudi/io/storage/HoodieParquetConfig.java | 15 +++--- .../io/storage/TestHoodieReaderWriterUtils.java | 2 +- .../io/hadoop}/HoodieAvroFileReaderFactory.java | 20 +++++--- .../io/hadoop}/HoodieAvroFileWriterFactory.java | 45 ++++++++++-------- .../{storage => hadoop}/HoodieAvroHFileWriter.java | 17 ++++--- .../hudi/io/hadoop}/HoodieAvroOrcReader.java | 23 +++++---- .../{storage => hadoop}/HoodieAvroOrcWriter.java | 19 ++++---- .../hudi/io/hadoop}/HoodieAvroParquetReader.java | 21 +++++---- .../HoodieAvroParquetWriter.java | 17 ++++--- .../HoodieBaseParquetWriter.java | 23 +++++---- .../apache/hudi/io/hadoop}/HoodieHFileConfig.java | 16 ++++--- .../hudi/io/hadoop}/HoodieParquetStreamWriter.java | 19 ++++---- .../parquet/io/OutputStreamBackedOutputFile.java | 0 .../TestHoodieAvroFileReaderFactory.java | 17 ++++--- .../TestHoodieBaseParquetWriter.java | 23 +++++---- .../TestHoodieHBaseHFileReaderWriter.java | 19 +++++--- .../TestHoodieHFileReaderWriter.java | 18 +++++--- .../TestHoodieHFileReaderWriterBase.java | 7 ++- .../TestHoodieOrcReaderWriter.java | 21 +++++---- .../TestHoodieReaderWriterBase.java | 6 ++- .../TestHoodieMergeOnReadSnapshotReader.java | 1 - .../org/apache/spark/sql/hudi/SparkHelpers.scala | 7 +-- .../org/apache/hudi/functional/TestBootstrap.java | 2 +- .../row/TestHoodieInternalRowParquetWriter.java | 3 +- 43 files changed, 324 insertions(+), 267 deletions(-) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java index 091d1d7195a..bff523f7f21 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -71,7 +71,7 @@ public class TestHoodieAvroParquetWriter { HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf.unwrap(), 0.1, true); + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf, 0.1, true); StoragePath filePath = new StoragePath(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index f6da22d7f74..e6521d03678 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -39,18 +39,18 @@ import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.io.storage.HoodieAvroOrcWriter; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter; +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.orc.CompressionKind; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; @@ -124,7 +124,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable { new AvroSchemaConverter().convert(schema), schema, Option.of(filter), new Properties()); HoodieParquetConfig<HoodieAvroWriteSupport> config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, - new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true); + storage.getConf(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true); try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter( new StoragePath(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime, contextSupplier, populateMetaFields)) { @@ -142,7 +142,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable { } } } else if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.ORC)) { - Configuration conf = new Configuration(); + StorageConfiguration conf = storage.getConf().newInstance(); int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue()); int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue()); int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index 072bde04756..e9bc86b4a76 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.table.HoodieTable; import org.apache.flink.table.types.logical.RowType; @@ -76,7 +77,7 @@ public class HoodieRowDataFileWriterFactory { writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), + new HadoopStorageConfiguration(writeSupport.getHadoopConf()), writeConfig.getParquetCompressionRatio(), writeConfig.parquetDictionaryEnabled())); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java index 8acd1ef9dd1..200662cc138 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java @@ -18,7 +18,7 @@ package org.apache.hudi.io.storage.row; -import org.apache.hudi.io.storage.HoodieBaseParquetWriter; +import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java index ee98ff322a3..ff17b48bf0c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java @@ -38,6 +38,7 @@ import org.apache.spark.sql.HoodieInternalRowUtils; import org.apache.spark.sql.types.StructType; import java.io.IOException; +import java.io.OutputStream; public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { @@ -67,7 +68,7 @@ public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { } protected HoodieFileWriter newParquetFileWriter( - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { boolean enableBloomFilter = false; HoodieRowParquetWriteSupport writeSupport = getHoodieRowParquetWriteSupport(conf, schema, config, enableBloomFilter); String compressionCodecName = config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); @@ -83,7 +84,7 @@ public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { writeSupport.getHadoopConf(), config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); parquetConfig.getHadoopConf().addResource(writeSupport.getHadoopConf()); - return new HoodieSparkParquetStreamWriter(outputStream, parquetConfig); + return new HoodieSparkParquetStreamWriter(new FSDataOutputStream(outputStream, null), parquetConfig); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java index 09f8d8dbe1c..ba4ab63006d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java @@ -21,6 +21,7 @@ package org.apache.hudi.io.storage; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter; import org.apache.hudi.io.storage.row.HoodieRowParquetConfig; import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java index ffad5a895cb..8e7287a7024 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.conf.Configuration; @@ -79,7 +80,7 @@ public class HoodieInternalRowFileWriterFactory { writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), + new HadoopStorageConfiguration(writeSupport.getHadoopConf()), writeConfig.getParquetCompressionRatio(), writeConfig.parquetDictionaryEnabled() )); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java index dcb1f197a04..f7ad33d2cbb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java @@ -18,7 +18,7 @@ package org.apache.hudi.io.storage.row; -import org.apache.hudi.io.storage.HoodieBaseParquetWriter; +import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java index f5f6d7b0a5b..f3b0f34b929 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -31,6 +32,11 @@ public class HoodieRowParquetConfig extends HoodieParquetConfig<HoodieRowParquet public HoodieRowParquetConfig(HoodieRowParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean enableDictionary) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, enableDictionary); + super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, + new HadoopStorageConfiguration(hadoopConf), compressionRatio, enableDictionary); + } + + public Configuration getHadoopConf() { + return getStorageConf().unwrapAs(Configuration.class); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java index 4a13c77b629..74826c6f39b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java @@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.io.hadoop.HoodieAvroHFileWriter; +import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter; +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 527b9c2655e..9b317f54713 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -43,7 +43,6 @@ import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; import org.apache.hudi.internal.schema.utils.SerDeHelper; -import org.apache.hudi.io.storage.HoodieAvroOrcReader; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.HoodieStorage; @@ -352,8 +351,9 @@ public class TableSchemaResolver { private MessageType readSchemaFromORCBaseFile(StoragePath orcFilePath) throws IOException { LOG.info("Reading schema from {}", orcFilePath); - - HoodieAvroOrcReader orcReader = new HoodieAvroOrcReader(metaClient.getRawHoodieStorage().getConf(), orcFilePath); + HoodieFileReader orcReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(metaClient.getTableConfig(), metaClient.getRawHoodieStorage().getConf(), orcFilePath, + HoodieFileFormat.ORC, Option.empty()); return convertAvroSchemaToParquet(orcReader.getSchema()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index d426480fc68..aca30456b17 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -35,7 +35,6 @@ import org.apache.hudi.storage.inline.InLineFSUtils; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -105,38 +104,31 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { } Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) { - HoodieFileWriter parquetWriter = null; - HoodieConfig config = new HoodieConfig(); - config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name()); - config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); - config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); - config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); - config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); - config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get())); - HoodieRecordType recordType = records.iterator().next().getRecordType(); - try { - parquetWriter = HoodieFileWriterFactory.getFileWriter( - HoodieFileFormat.PARQUET, - outputStream, - HoodieStorageUtils.getStorageConf(new Configuration()), - config, - writerSchema, - recordType); - for (HoodieRecord<?> record : records) { - String recordKey = getRecordKey(record).orElse(null); - parquetWriter.write(recordKey, record, writerSchema); - } - outputStream.flush(); - } finally { - if (parquetWriter != null) { - parquetWriter.close(); - } + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + HoodieConfig config = new HoodieConfig(); + config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name()); + config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); + config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); + config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); + config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); + config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get())); + HoodieRecordType recordType = records.iterator().next().getRecordType(); + HoodieFileWriter parquetWriter = null; + try { + parquetWriter = HoodieFileWriterFactory.getFileWriter( + HoodieFileFormat.PARQUET, outputStream, HoodieStorageUtils.getStorageConf(new Configuration()), + config, writerSchema, recordType); + for (HoodieRecord<?> record : records) { + String recordKey = getRecordKey(record).orElse(null); + parquetWriter.write(recordKey, record, writerSchema); + } + outputStream.flush(); + } finally { + if (parquetWriter != null) { + parquetWriter.close(); } } - - return baos.toByteArray(); + return outputStream.toByteArray(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java index a829880d5f9..9b49fa871e2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java @@ -18,10 +18,32 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; + +import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import java.io.IOException; + +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; + /** - * Marker interface for every {@link HoodieFileReader} reading in Avro (ie - * producing {@link IndexedRecord}s) + * Base class for every Avro file reader */ -public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord> {} +public abstract class HoodieAvroFileReader implements HoodieFileReader<IndexedRecord> { + + @Override + public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema); + return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); + } + + protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException { + return getIndexedRecordIterator(readerSchema, readerSchema); + } + + public abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; +} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java deleted file mode 100644 index af65bac055c..00000000000 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage; - -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.collection.ClosableIterator; -import org.apache.hudi.common.util.collection.CloseableMappingIterator; - -import java.io.IOException; - -import static org.apache.hudi.common.util.TypeUtils.unsafeCast; - -/** - * Base class for every {@link HoodieAvroFileReader} - */ -abstract class HoodieAvroFileReaderBase implements HoodieAvroFileReader { - - @Override - public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { - ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema); - return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); - } - - protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException { - return getIndexedRecordIterator(readerSchema, readerSchema); - } - - protected abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; -} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java index 5e1a260e158..dd28d5f5589 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java @@ -38,7 +38,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.util.CollectionUtils.toStream; import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes; -public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReaderBase +public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader implements HoodieSeekingFileReader<IndexedRecord> { // TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling public static final String SCHEMA_KEY = "schema"; @@ -54,7 +54,7 @@ public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader * <p> * Reads all the records with given schema */ - public static List<IndexedRecord> readAllRecords(HoodieAvroFileReaderBase reader) + public static List<IndexedRecord> readAllRecords(HoodieAvroFileReader reader) throws IOException { Schema schema = reader.getSchema(); return toStream(reader.getIndexedRecordIterator(schema)) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java index fe075ccdc8f..c285f04a2b2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java @@ -46,14 +46,21 @@ public class HoodieFileReaderFactory { public static HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) { switch (recordType) { case AVRO: - return new HoodieAvroFileReaderFactory(); + + try { + Class<?> clazz = + ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory"); + return (HoodieFileReaderFactory) clazz.newInstance(); + } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) { + throw new HoodieException("Unable to create HoodieAvroFileReaderFactory", e); + } case SPARK: try { Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory"); return (HoodieFileReaderFactory) clazz.newInstance(); } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) { - throw new HoodieException("Unable to create hoodie spark file writer factory", e); + throw new HoodieException("Unable to create HoodieSparkFileReaderFactory", e); } default: throw new UnsupportedOperationException(recordType + " record type not supported yet."); diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 4ca426c2513..1c588bce8af 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -33,9 +33,9 @@ import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; -import org.apache.hadoop.fs.FSDataOutputStream; import java.io.IOException; +import java.io.OutputStream; import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; import static org.apache.hudi.common.model.HoodieFileFormat.ORC; @@ -46,13 +46,18 @@ public class HoodieFileWriterFactory { private static HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) { switch (recordType) { case AVRO: - return new HoodieAvroFileWriterFactory(); + try { + Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory"); + return (HoodieFileWriterFactory) clazz.newInstance(); + } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) { + throw new HoodieException("Unable to create HoodieAvroFileWriterFactory", e); + } case SPARK: try { Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory"); return (HoodieFileWriterFactory) clazz.newInstance(); } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) { - throw new HoodieException("Unable to create hoodie spark file writer factory", e); + throw new HoodieException("Unable to create HoodieSparkFileWriterFactory", e); } default: throw new UnsupportedOperationException(recordType + " record type not supported yet."); @@ -67,8 +72,8 @@ public class HoodieFileWriterFactory { return factory.getFileWriterByFormat(extension, instantTime, path, conf, config, schema, taskContextSupplier); } - public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat format, - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema, HoodieRecordType recordType) + public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat format, OutputStream outputStream, + StorageConfiguration<?> conf, HoodieConfig config, Schema schema, HoodieRecordType recordType) throws IOException { HoodieFileWriterFactory factory = getWriterFactory(recordType); return factory.getFileWriterByFormat(format, outputStream, conf, config, schema); @@ -89,8 +94,8 @@ public class HoodieFileWriterFactory { throw new UnsupportedOperationException(extension + " format not supported yet."); } - protected <T, I, K, O> HoodieFileWriter getFileWriterByFormat(HoodieFileFormat format, - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { + protected <T, I, K, O> HoodieFileWriter getFileWriterByFormat(HoodieFileFormat format, OutputStream outputStream, + StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { switch (format) { case PARQUET: return newParquetFileWriter(outputStream, conf, config, schema); @@ -106,7 +111,7 @@ public class HoodieFileWriterFactory { } protected HoodieFileWriter newParquetFileWriter( - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHBaseAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHBaseAvroHFileReader.java index 4a82eddd70b..fd78ef51068 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHBaseAvroHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHBaseAvroHFileReader.java @@ -203,7 +203,7 @@ public class HoodieHBaseAvroHFileReader extends HoodieAvroHFileReaderImplBase { } @Override - protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) { + public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) { if (!Objects.equals(readerSchema, requestedSchema)) { throw new UnsupportedOperationException("Schema projections are not supported in HFile reader"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java index c45e02452e3..7cac57fa919 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java @@ -18,23 +18,24 @@ package org.apache.hudi.io.storage; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.storage.StorageConfiguration; + import org.apache.orc.CompressionKind; public class HoodieOrcConfig { - static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema"; + public static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema"; private final CompressionKind compressionKind; private final int stripeSize; private final int blockSize; private final long maxFileSize; - private final Configuration hadoopConf; + private final StorageConfiguration<?> storageConf; private final BloomFilter bloomFilter; - public HoodieOrcConfig(Configuration hadoopConf, CompressionKind compressionKind, int stripeSize, + public HoodieOrcConfig(StorageConfiguration<?> storageConf, CompressionKind compressionKind, int stripeSize, int blockSize, long maxFileSize, BloomFilter bloomFilter) { - this.hadoopConf = hadoopConf; + this.storageConf = storageConf; this.compressionKind = compressionKind; this.stripeSize = stripeSize; this.blockSize = blockSize; @@ -42,8 +43,8 @@ public class HoodieOrcConfig { this.bloomFilter = bloomFilter; } - public Configuration getHadoopConf() { - return hadoopConf; + public StorageConfiguration<?> getStorageConf() { + return storageConf; } public CompressionKind getCompressionKind() { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java index b5e567b7644..e17a017d679 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java @@ -18,7 +18,8 @@ package org.apache.hudi.io.storage; -import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.storage.StorageConfiguration; + import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** @@ -31,18 +32,18 @@ public class HoodieParquetConfig<T> { private final int blockSize; private final int pageSize; private final long maxFileSize; - private final Configuration hadoopConf; + private final StorageConfiguration<?> storageConf; private final double compressionRatio; private final boolean dictionaryEnabled; - public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, - int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) { + public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize, + long maxFileSize, StorageConfiguration<?> storageConf, double compressionRatio, boolean dictionaryEnabled) { this.writeSupport = writeSupport; this.compressionCodecName = compressionCodecName; this.blockSize = blockSize; this.pageSize = pageSize; this.maxFileSize = maxFileSize; - this.hadoopConf = hadoopConf; + this.storageConf = storageConf; this.compressionRatio = compressionRatio; this.dictionaryEnabled = dictionaryEnabled; } @@ -63,8 +64,8 @@ public class HoodieParquetConfig<T> { return maxFileSize; } - public Configuration getHadoopConf() { - return hadoopConf; + public StorageConfiguration<?> getStorageConf() { + return storageConf; } public double getCompressionRatio() { diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java index a0ec0dfdb89..2fc38c156a3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java @@ -44,7 +44,7 @@ import static org.apache.hudi.io.hfile.TestHFileReader.DUMMY_BLOOM_FILTER; * Utils for reader and writer tests. */ public class TestHoodieReaderWriterUtils { - static void writeHFileForTesting(String fileLocation, + public static void writeHFileForTesting(String fileLocation, int blockSize, Compression.Algorithm compressionAlgo, int numEntries, diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java similarity index 81% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java index 6a6b0b67aa5..3a4d0b910ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java @@ -7,19 +7,25 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader; +import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java similarity index 80% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java index 2a727158e17..d0b8faa7589 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -27,6 +28,11 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieFileWriter; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.io.storage.HoodieOrcConfig; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -40,18 +46,19 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; import java.io.IOException; +import java.io.OutputStream; import java.util.Properties; -import static org.apache.hudi.io.storage.HoodieHFileConfig.CACHE_DATA_IN_L1; -import static org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION; -import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR; -import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.CACHE_DATA_IN_L1; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.PREFETCH_ON_OPEN; public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { //hardcoded classes to remove at a later time - public static final String HOODIE_AVRO_PARQUET_WRITER = "org.apache.hudi.io.storage.HoodieAvroParquetWriter"; - public static final String HOODIE_AVRO_HFILE_WRITER = "org.apache.hudi.io.storage.HoodieAvroHFileWriter"; - public static final String HOODIE_AVRO_ORC_WRITER = "org.apache.hudi.io.storage.HoodieAvroOrcWriter"; + public static final String HOODIE_AVRO_PARQUET_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroParquetWriter"; + public static final String HOODIE_AVRO_HFILE_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroHFileWriter"; + public static final String HOODIE_AVRO_ORC_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroOrcWriter"; @Override protected HoodieFileWriter newParquetFileWriter( @@ -70,7 +77,7 @@ public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { config.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE), config.getIntOrDefault(HoodieStorageConfig.PARQUET_PAGE_SIZE), config.getLongOrDefault(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE), - conf.unwrapAs(Configuration.class), config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), + conf, config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); try { return (HoodieFileWriter) ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER, @@ -83,16 +90,16 @@ public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { } protected HoodieFileWriter newParquetFileWriter( - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { HoodieAvroWriteSupport writeSupport = getHoodieAvroWriteSupport(conf, schema, config, false); HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.fromConf(config.getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME)), config.getInt(HoodieStorageConfig.PARQUET_BLOCK_SIZE), config.getInt(HoodieStorageConfig.PARQUET_PAGE_SIZE), config.getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE), // todo: 1024*1024*1024 - conf.unwrapAs(Configuration.class), config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), + conf, config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBoolean(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); - return new HoodieParquetStreamWriter(outputStream, parquetConfig); + return new HoodieParquetStreamWriter(new FSDataOutputStream(outputStream, null), parquetConfig); } protected HoodieFileWriter newHFileFileWriter( @@ -120,7 +127,7 @@ public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { String instantTime, StoragePath path, StorageConfiguration<?> conf, HoodieConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { BloomFilter filter = createBloomFilter(config); - HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf.unwrapAs(Configuration.class), + HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf, CompressionKind.valueOf(config.getString(HoodieStorageConfig.ORC_COMPRESSION_CODEC_NAME)), config.getInt(HoodieStorageConfig.ORC_STRIPE_SIZE), config.getInt(HoodieStorageConfig.ORC_BLOCK_SIZE), diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java similarity index 93% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java index a1ffef280f5..d3d66b5c978 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; @@ -26,6 +27,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieDuplicateKeyException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java similarity index 83% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java index f119c44fd79..e4ac961065b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java @@ -7,24 +7,27 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.common.util.BaseFileUtils; -import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.OrcReaderIterator; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -46,7 +49,7 @@ import java.util.Set; /** * {@link HoodieFileReader} implementation for ORC format. */ -public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase { +public class HoodieAvroOrcReader extends HoodieAvroFileReader { private final StoragePath path; private final StorageConfiguration<?> conf; @@ -74,7 +77,7 @@ public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase { } @Override - protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) { + public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) { if (!Objects.equals(readerSchema, requestedSchema)) { throw new UnsupportedOperationException("Schema projections are not supported in HFile reader"); } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java similarity index 91% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java index 07e7bc7f122..40e37fa145f 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; @@ -27,6 +28,8 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; @@ -70,7 +73,7 @@ public class HoodieAvroOrcWriter implements HoodieAvroFileWriter, Closeable { public HoodieAvroOrcWriter(String instantTime, StoragePath file, HoodieOrcConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { - Configuration conf = HadoopFSUtils.registerFileSystem(file, config.getHadoopConf()); + Configuration conf = HadoopFSUtils.registerFileSystem(file, config.getStorageConf().unwrapAs(Configuration.class)); this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf); this.instantTime = instantTime; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java similarity index 92% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java index 2283afd31a3..25ad701e01d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; @@ -28,6 +29,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -51,7 +54,7 @@ import static org.apache.hudi.common.util.TypeUtils.unsafeCast; /** * {@link HoodieFileReader} implementation for parquet format. */ -public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase { +public class HoodieAvroParquetReader extends HoodieAvroFileReader { private final StoragePath path; private final StorageConfiguration<?> conf; @@ -96,7 +99,7 @@ public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase { } @Override - protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { return getIndexedRecordIteratorInternal(readerSchema, Option.of(requestedSchema)); } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java similarity index 84% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java index 4269e6513a2..f8f9a8ccea0 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java @@ -7,20 +7,23 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; import org.apache.avro.generic.IndexedRecord; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java similarity index 90% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java index 06f1e513055..8f17fa0fa1e 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java @@ -7,20 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; import org.apache.hadoop.conf.Configuration; @@ -52,8 +54,9 @@ public abstract class HoodieBaseParquetWriter<R> implements Closeable { public HoodieBaseParquetWriter(StoragePath file, HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException { + Configuration hadoopConf = parquetConfig.getStorageConf().unwrapAs(Configuration.class); ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder( - HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) { + HoodieWrapperFileSystem.convertToHoodiePath(file, hadoopConf)) { @Override protected ParquetWriter.Builder self() { return this; @@ -73,8 +76,8 @@ public abstract class HoodieBaseParquetWriter<R> implements Closeable { parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled()); parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED); parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION); - parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - handleParquetBloomFilters(parquetWriterbuilder, parquetConfig.getHadoopConf()); + parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file, hadoopConf)); + handleParquetBloomFilters(parquetWriterbuilder, hadoopConf); parquetWriter = parquetWriterbuilder.build(); // We cannot accurately measure the snappy compressed output file size. We are choosing a diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java similarity index 87% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java index 64cc607ef63..83b659a6be0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java @@ -7,18 +7,20 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.io.storage.HoodieHBaseKVComparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java similarity index 84% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java index 226266bf6cf..5fdd6505733 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java @@ -7,19 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile; import org.apache.avro.generic.IndexedRecord; @@ -54,7 +57,7 @@ public class HoodieParquetStreamWriter implements HoodieAvroFileWriter, AutoClos .withDictionaryPageSize(parquetConfig.getPageSize()) .withDictionaryEncoding(parquetConfig.dictionaryEnabled()) .withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION) - .withConf(parquetConfig.getHadoopConf()) + .withConf(parquetConfig.getStorageConf().unwrapAs(Configuration.class)) .build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java similarity index 100% rename from hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java similarity index 83% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java index 96b8ea9e6b3..7faf84a1ee5 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java @@ -7,19 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java similarity index 86% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java index f9909b0f5f2..82a80b1ce26 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java @@ -7,28 +7,31 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -83,7 +86,7 @@ public class TestHoodieBaseParquetWriter { public void testCanWrite() throws IOException { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, BloomFilterTypeCode.DYNAMIC_V0.name()); - Configuration hadoopConf = new Configuration(); + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConfWithDefaults(); Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), @@ -92,7 +95,7 @@ public class TestHoodieBaseParquetWriter { long maxFileSize = 2 * 1024 * 1024; HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, hadoopConf, 0, true); + ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, conf, 0, true); StoragePath filePath = new StoragePath( new StoragePath(tempDir.toUri()), "test_fileSize.parquet"); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java similarity index 90% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java index d6af1db8cba..ca45ece4982 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java @@ -7,19 +7,24 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader; +import org.apache.hudi.io.storage.HoodieHFileUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java similarity index 85% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java index 6fe0e2ffea5..b87af2c8371 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java @@ -7,19 +7,23 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java similarity index 98% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java index 856e73197a2..1d69115315a 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.HoodieStorageConfig; @@ -29,6 +29,9 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; @@ -75,7 +78,7 @@ import static org.apache.hudi.io.hfile.TestHFileReader.BOOTSTRAP_INDEX_HFILE_SUF import static org.apache.hudi.io.hfile.TestHFileReader.COMPLEX_SCHEMA_HFILE_SUFFIX; import static org.apache.hudi.io.hfile.TestHFileReader.SIMPLE_SCHEMA_HFILE_SUFFIX; import static org.apache.hudi.io.hfile.TestHFileReader.readHFileFromResources; -import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java similarity index 87% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java index bc719be8bc8..6a94a32ed3c 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -25,6 +26,10 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -57,7 +62,7 @@ public class TestHoodieOrcReaderWriter extends TestHoodieReaderWriterBase { protected HoodieAvroOrcWriter createWriter( Schema avroSchema, boolean populateMetaFields) throws Exception { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); - Configuration conf = new Configuration(); + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConfWithDefaults(); int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue()); int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue()); int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue()); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java similarity index 97% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java index 5f1e7d1c04a..1bd376e4139 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java @@ -17,13 +17,17 @@ * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java index 86f7f6c82a8..30ac00b0b0d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java @@ -37,7 +37,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala index 4d925d3d4ed..791435f4bb7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala @@ -24,12 +24,13 @@ import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.{BaseFileUtils, Option} -import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig} +import org.apache.hudi.io.storage.HoodieParquetConfig import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath} import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -61,12 +62,12 @@ object SparkHelpers { HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, - conf.unwrap(), + conf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble, HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.defaultValue) // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. - parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) + conf.unwrap().setClassLoader(Thread.currentThread.getContextClassLoader) val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true) for (rec <- sourceRecords) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index 0a7e98accb3..2b371cf1db3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -55,7 +55,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.hadoop.HoodieAvroParquetReader; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index 65d140da8b3..95f151336c7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; @@ -89,7 +90,7 @@ public class TestHoodieInternalRowParquetWriter extends HoodieSparkClientTestHar HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); + new HadoopStorageConfiguration(writeSupport.getHadoopConf()), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); StoragePath filePath = new StoragePath(basePath + "/internal_row_writer.parquet");
