This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e42217d2368 [HUDI-7350] Make Hudi reader and writer factory APIs
Hadoop-independent (#11163)
e42217d2368 is described below
commit e42217d2368db493bc94930b511ec05c79fd9cdc
Author: Jon Vexler <[email protected]>
AuthorDate: Thu May 9 20:45:36 2024 -0400
[HUDI-7350] Make Hudi reader and writer factory APIs Hadoop-independent
(#11163)
Abstract io reader and writer to de-hadoop
---------
Co-authored-by: Jonathan Vexler <=>
---
.../hudi/client/timeline/LSMTimelineWriter.java | 2 +-
.../hudi/avro/TestHoodieAvroParquetWriter.java | 4 +-
.../hudi/testutils/HoodieWriteableTestTable.java | 10 ++--
.../row/HoodieRowDataFileWriterFactory.java | 3 +-
.../io/storage/row/HoodieRowDataParquetWriter.java | 2 +-
.../io/storage/HoodieSparkFileWriterFactory.java | 5 +-
.../hudi/io/storage/HoodieSparkParquetWriter.java | 1 +
.../row/HoodieInternalRowFileWriterFactory.java | 3 +-
.../row/HoodieInternalRowParquetWriter.java | 2 +-
.../io/storage/row/HoodieRowParquetConfig.java | 8 ++-
.../storage/TestHoodieAvroFileWriterFactory.java | 3 +
.../hudi/common/table/TableSchemaResolver.java | 6 +-
.../table/log/block/HoodieParquetDataBlock.java | 48 ++++++----------
.../table/timeline/HoodieArchivedTimeline.java | 4 +-
.../hudi/io/storage/HoodieAvroFileReader.java | 28 +++++++++-
.../hudi/io/storage/HoodieAvroFileReaderBase.java | 49 ----------------
.../io/storage/HoodieAvroHFileReaderImplBase.java | 4 +-
.../hudi/io/storage/HoodieFileReaderFactory.java | 11 +++-
.../hudi/io/storage/HoodieFileWriterFactory.java | 21 ++++---
.../apache/hudi/io/storage/HoodieOrcConfig.java | 15 ++---
.../hudi/io/storage/HoodieParquetConfig.java | 15 ++---
.../testutils/reader/HoodieFileSliceTestUtils.java | 65 ++++++++++------------
.../testutils/reader/HoodieTestReaderContext.java | 9 ++-
.../io/storage/TestHoodieReaderWriterUtils.java | 2 +-
.../io/hadoop}/HoodieAvroFileReaderFactory.java | 20 ++++---
.../io/hadoop}/HoodieAvroFileWriterFactory.java | 45 ++++++++-------
.../{storage => hadoop}/HoodieAvroHFileWriter.java | 17 +++---
.../hudi/io/hadoop}/HoodieAvroOrcReader.java | 19 ++++---
.../{storage => hadoop}/HoodieAvroOrcWriter.java | 19 ++++---
.../hudi/io/hadoop}/HoodieAvroParquetReader.java | 19 ++++---
.../HoodieAvroParquetWriter.java | 17 +++---
.../HoodieBaseParquetWriter.java | 23 ++++----
.../apache/hudi/io/hadoop}/HoodieHFileConfig.java | 16 +++---
.../hudi/io/hadoop}/HoodieParquetStreamWriter.java | 19 ++++---
.../parquet/io/OutputStreamBackedOutputFile.java | 0
.../TestHoodieAvroFileReaderFactory.java | 17 +++---
.../TestHoodieBaseParquetWriter.java | 23 ++++----
.../TestHoodieHBaseHFileReaderWriter.java | 19 ++++---
.../TestHoodieHFileReaderWriter.java | 18 +++---
.../TestHoodieHFileReaderWriterBase.java | 7 ++-
.../TestHoodieOrcReaderWriter.java | 21 ++++---
.../TestHoodieReaderWriterBase.java | 7 ++-
.../org/apache/spark/sql/hudi/SparkHelpers.scala | 7 ++-
.../org/apache/hudi/functional/TestBootstrap.java | 2 +-
.../row/TestHoodieInternalRowParquetWriter.java | 3 +-
45 files changed, 354 insertions(+), 304 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
index ccf2332699c..2ea57939427 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
@@ -38,7 +38,7 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
index 091d1d7195a..bff523f7f21 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
-import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -71,7 +71,7 @@ public class TestHoodieAvroParquetWriter {
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE,
- ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024,
storageConf.unwrap(), 0.1, true);
+ ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf,
0.1, true);
StoragePath filePath = new
StoragePath(tmpDir.resolve("test.parquet").toAbsolutePath().toString());
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index a135a2b22e4..b45d80b1ee6 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -39,18 +39,18 @@ import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.io.storage.HoodieAvroOrcWriter;
-import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
import org.apache.orc.CompressionKind;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -124,7 +124,7 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
new AvroSchemaConverter().convert(schema), schema,
Option.of(filter), new Properties());
HoodieParquetConfig<HoodieAvroWriteSupport> config = new
HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
120 * 1024 * 1024,
- new Configuration(),
Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()),
true);
+ storage.getConf(),
Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()),
true);
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter(
new StoragePath(Paths.get(basePath, partition,
fileName).toString()), config, currentInstantTime,
contextSupplier, populateMetaFields)) {
@@ -142,7 +142,7 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
}
}
} else if
(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.ORC))
{
- Configuration conf = new Configuration();
+ StorageConfiguration conf = storage.getConf().newInstance();
int orcStripSize =
Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue());
int orcBlockSize =
Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue());
int maxFileSize =
Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue());
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index 072bde04756..e9bc86b4a76 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTable;
import org.apache.flink.table.types.logical.RowType;
@@ -76,7 +77,7 @@ public class HoodieRowDataFileWriterFactory {
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
- writeSupport.getHadoopConf(),
+ new HadoopStorageConfiguration(writeSupport.getHadoopConf()),
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled()));
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
index 8acd1ef9dd1..200662cc138 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
@@ -18,7 +18,7 @@
package org.apache.hudi.io.storage.row;
-import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
+import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index ee98ff322a3..ff17b48bf0c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -38,6 +38,7 @@ import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
+import java.io.OutputStream;
public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory {
@@ -67,7 +68,7 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
}
protected HoodieFileWriter newParquetFileWriter(
- FSDataOutputStream outputStream, StorageConfiguration<?> conf,
HoodieConfig config, Schema schema) throws IOException {
+ OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig
config, Schema schema) throws IOException {
boolean enableBloomFilter = false;
HoodieRowParquetWriteSupport writeSupport =
getHoodieRowParquetWriteSupport(conf, schema, config, enableBloomFilter);
String compressionCodecName =
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
@@ -83,7 +84,7 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
writeSupport.getHadoopConf(),
config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED));
parquetConfig.getHadoopConf().addResource(writeSupport.getHadoopConf());
- return new HoodieSparkParquetStreamWriter(outputStream, parquetConfig);
+ return new HoodieSparkParquetStreamWriter(new
FSDataOutputStream(outputStream, null), parquetConfig);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
index 09f8d8dbe1c..ba4ab63006d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
import org.apache.hudi.io.storage.row.HoodieRowParquetConfig;
import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport;
import org.apache.hudi.storage.StoragePath;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index ffad5a895cb..8e7287a7024 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
@@ -79,7 +80,7 @@ public class HoodieInternalRowFileWriterFactory {
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
- writeSupport.getHadoopConf(),
+ new HadoopStorageConfiguration(writeSupport.getHadoopConf()),
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled()
));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
index dcb1f197a04..f7ad33d2cbb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
@@ -18,7 +18,7 @@
package org.apache.hudi.io.storage.row;
-import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
+import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java
index f5f6d7b0a5b..f3b0f34b929 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io.storage.row;
import org.apache.hudi.io.storage.HoodieParquetConfig;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -31,6 +32,11 @@ public class HoodieRowParquetConfig extends
HoodieParquetConfig<HoodieRowParquet
public HoodieRowParquetConfig(HoodieRowParquetWriteSupport writeSupport,
CompressionCodecName compressionCodecName,
int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf,
double compressionRatio, boolean enableDictionary) {
- super(writeSupport, compressionCodecName, blockSize, pageSize,
maxFileSize, hadoopConf, compressionRatio, enableDictionary);
+ super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize,
+ new HadoopStorageConfiguration(hadoopConf), compressionRatio,
enableDictionary);
+ }
+
+ public Configuration getHadoopConf() {
+ return getStorageConf().unwrapAs(Configuration.class);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
index 4a13c77b629..74826c6f39b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
@@ -24,6 +24,9 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.io.hadoop.HoodieAvroHFileWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 6857513a2bb..f3b2bc69dc5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -43,7 +43,6 @@ import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.internal.schema.InternalSchema;
import
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
-import org.apache.hudi.io.storage.HoodieAvroOrcReader;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.storage.HoodieStorage;
@@ -356,8 +355,9 @@ public class TableSchemaResolver {
private MessageType readSchemaFromORCBaseFile(StoragePath orcFilePath)
throws IOException {
LOG.info("Reading schema from {}", orcFilePath);
-
- HoodieAvroOrcReader orcReader = new
HoodieAvroOrcReader(metaClient.getRawHoodieStorage().getConf(), orcFilePath);
+ HoodieFileReader orcReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ .getFileReader(metaClient.getTableConfig(),
metaClient.getRawHoodieStorage().getConf(), orcFilePath,
+ HoodieFileFormat.ORC, Option.empty());
return convertAvroSchemaToParquet(orcReader.getSchema());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 4d7f3f838f2..6c2e6802769 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -36,7 +36,6 @@ import org.apache.hudi.storage.inline.InLineFSUtils;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -107,38 +106,25 @@ public class HoodieParquetDataBlock extends
HoodieDataBlock {
}
Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null))
{
- HoodieFileWriter parquetWriter = null;
- HoodieConfig config = new HoodieConfig();
- config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(),
compressionCodecName.get().name());
- config.setValue(PARQUET_BLOCK_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
- config.setValue(PARQUET_PAGE_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
- config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024
* 1024));
- config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(),
String.valueOf(expectedCompressionRatio.get()));
- config.setValue(PARQUET_DICTIONARY_ENABLED,
String.valueOf(useDictionaryEncoding.get()));
- HoodieRecordType recordType = records.iterator().next().getRecordType();
- try {
- parquetWriter = HoodieFileWriterFactory.getFileWriter(
- HoodieFileFormat.PARQUET,
- outputStream,
- HoodieStorageUtils.getStorageConf(new Configuration()),
- config,
- writerSchema,
- recordType);
- for (HoodieRecord<?> record : records) {
- String recordKey = getRecordKey(record).orElse(null);
- parquetWriter.write(recordKey, record, writerSchema);
- }
- outputStream.flush();
- } finally {
- if (parquetWriter != null) {
- parquetWriter.close();
- }
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(),
compressionCodecName.get().name());
+ config.setValue(PARQUET_BLOCK_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
+ config.setValue(PARQUET_PAGE_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
+ config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 *
1024));
+ config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(),
String.valueOf(expectedCompressionRatio.get()));
+ config.setValue(PARQUET_DICTIONARY_ENABLED,
String.valueOf(useDictionaryEncoding.get()));
+ HoodieRecordType recordType = records.iterator().next().getRecordType();
+ try (HoodieFileWriter parquetWriter =
HoodieFileWriterFactory.getFileWriter(
+ HoodieFileFormat.PARQUET, outputStream,
HoodieStorageUtils.getStorageConf(new Configuration()),
+ config, writerSchema, recordType)) {
+ for (HoodieRecord<?> record : records) {
+ String recordKey = getRecordKey(record).orElse(null);
+ parquetWriter.write(recordKey, record, writerSchema);
}
+ outputStream.flush();
}
-
- return baos.toByteArray();
+ return outputStream.toByteArray();
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 58ec48ea630..42f8a6a2753 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.storage.StoragePath;
@@ -266,7 +266,7 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
.filter(fileName -> filter == null ||
LSMTimeline.isFileInRange(filter, fileName))
.parallel().forEach(fileName -> {
// Read the archived file
- try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ try (HoodieAvroFileReader reader = (HoodieAvroFileReader)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER,
metaClient.getStorageConf(), new StoragePath(metaClient.getArchivePath(),
fileName))) {
try (ClosableIterator<IndexedRecord> iterator =
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(),
readSchema)) {
while (iterator.hasNext()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
index a829880d5f9..9b49fa871e2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
@@ -18,10 +18,32 @@
package org.apache.hudi.io.storage;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+
+import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
/**
- * Marker interface for every {@link HoodieFileReader} reading in Avro (ie
- * producing {@link IndexedRecord}s)
+ * Base class for every Avro file reader
*/
-public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord>
{}
+public abstract class HoodieAvroFileReader implements
HoodieFileReader<IndexedRecord> {
+
+ @Override
+ public ClosableIterator<HoodieRecord<IndexedRecord>>
getRecordIterator(Schema readerSchema, Schema requestedSchema) throws
IOException {
+ ClosableIterator<IndexedRecord> iterator =
getIndexedRecordIterator(readerSchema, requestedSchema);
+ return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new
HoodieAvroIndexedRecord(data)));
+ }
+
+ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
readerSchema) throws IOException {
+ return getIndexedRecordIterator(readerSchema, readerSchema);
+ }
+
+ public abstract ClosableIterator<IndexedRecord>
getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws
IOException;
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
deleted file mode 100644
index b15ce11fd53..00000000000
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.io.storage;
-
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-
-import java.io.IOException;
-
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
-/**
- * Base class for every {@link HoodieAvroFileReader}
- */
-abstract class HoodieAvroFileReaderBase implements HoodieAvroFileReader {
-
- @Override
- public ClosableIterator<HoodieRecord<IndexedRecord>>
getRecordIterator(Schema readerSchema, Schema requestedSchema) throws
IOException {
- ClosableIterator<IndexedRecord> iterator =
getIndexedRecordIterator(readerSchema, requestedSchema);
- return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new
HoodieAvroIndexedRecord(data)));
- }
-
- protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
readerSchema) throws IOException {
- return getIndexedRecordIterator(readerSchema, readerSchema);
- }
-
- public abstract ClosableIterator<IndexedRecord>
getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws
IOException;
-}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
index 5e1a260e158..dd28d5f5589 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
-public abstract class HoodieAvroHFileReaderImplBase extends
HoodieAvroFileReaderBase
+public abstract class HoodieAvroHFileReaderImplBase extends
HoodieAvroFileReader
implements HoodieSeekingFileReader<IndexedRecord> {
// TODO HoodieHFileReader right now tightly coupled to MT, we should break
that coupling
public static final String SCHEMA_KEY = "schema";
@@ -54,7 +54,7 @@ public abstract class HoodieAvroHFileReaderImplBase extends
HoodieAvroFileReader
* <p>
* Reads all the records with given schema
*/
- public static List<IndexedRecord> readAllRecords(HoodieAvroFileReaderBase
reader)
+ public static List<IndexedRecord> readAllRecords(HoodieAvroFileReader reader)
throws IOException {
Schema schema = reader.getSchema();
return toStream(reader.getIndexedRecordIterator(schema))
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index fe075ccdc8f..c285f04a2b2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -46,14 +46,21 @@ public class HoodieFileReaderFactory {
public static HoodieFileReaderFactory
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
switch (recordType) {
case AVRO:
- return new HoodieAvroFileReaderFactory();
+
+ try {
+ Class<?> clazz =
+
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory");
+ return (HoodieFileReaderFactory) clazz.newInstance();
+ } catch (IllegalArgumentException | IllegalAccessException |
InstantiationException e) {
+ throw new HoodieException("Unable to create
HoodieAvroFileReaderFactory", e);
+ }
case SPARK:
try {
Class<?> clazz =
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
return (HoodieFileReaderFactory) clazz.newInstance();
} catch (IllegalArgumentException | IllegalAccessException |
InstantiationException e) {
- throw new HoodieException("Unable to create hoodie spark file writer
factory", e);
+ throw new HoodieException("Unable to create
HoodieSparkFileReaderFactory", e);
}
default:
throw new UnsupportedOperationException(recordType + " record type not
supported yet.");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index d57dd55fcd5..69a8924f508 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -33,9 +33,9 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FSDataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
@@ -46,13 +46,18 @@ public class HoodieFileWriterFactory {
private static HoodieFileWriterFactory
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
switch (recordType) {
case AVRO:
- return new HoodieAvroFileWriterFactory();
+ try {
+ Class<?> clazz =
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory");
+ return (HoodieFileWriterFactory) clazz.newInstance();
+ } catch (IllegalAccessException | IllegalArgumentException |
InstantiationException e) {
+ throw new HoodieException("Unable to create
HoodieAvroFileWriterFactory", e);
+ }
case SPARK:
try {
Class<?> clazz =
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
return (HoodieFileWriterFactory) clazz.newInstance();
} catch (IllegalAccessException | IllegalArgumentException |
InstantiationException e) {
- throw new HoodieException("Unable to create hoodie spark file writer
factory", e);
+ throw new HoodieException("Unable to create
HoodieSparkFileWriterFactory", e);
}
default:
throw new UnsupportedOperationException(recordType + " record type not
supported yet.");
@@ -67,8 +72,8 @@ public class HoodieFileWriterFactory {
return factory.getFileWriterByFormat(extension, instantTime, path, conf,
config, schema, taskContextSupplier);
}
- public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat
format,
- FSDataOutputStream
outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema,
HoodieRecordType recordType)
+ public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat
format, OutputStream outputStream,
+
StorageConfiguration<?> conf, HoodieConfig config, Schema schema,
HoodieRecordType recordType)
throws IOException {
HoodieFileWriterFactory factory = getWriterFactory(recordType);
return factory.getFileWriterByFormat(format, outputStream, conf, config,
schema);
@@ -89,8 +94,8 @@ public class HoodieFileWriterFactory {
throw new UnsupportedOperationException(extension + " format not supported
yet.");
}
- protected <T, I, K, O> HoodieFileWriter
getFileWriterByFormat(HoodieFileFormat format,
-
FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig
config, Schema schema) throws IOException {
+ protected <T, I, K, O> HoodieFileWriter
getFileWriterByFormat(HoodieFileFormat format, OutputStream outputStream,
+
StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws
IOException {
switch (format) {
case PARQUET:
return newParquetFileWriter(outputStream, conf, config, schema);
@@ -106,7 +111,7 @@ public class HoodieFileWriterFactory {
}
protected HoodieFileWriter newParquetFileWriter(
- FSDataOutputStream outputStream, StorageConfiguration<?> conf,
HoodieConfig config, Schema schema) throws IOException {
+ OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig
config, Schema schema) throws IOException {
throw new UnsupportedOperationException();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
index c45e02452e3..7cac57fa919 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
@@ -18,23 +18,24 @@
package org.apache.hudi.io.storage;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.storage.StorageConfiguration;
+
import org.apache.orc.CompressionKind;
public class HoodieOrcConfig {
- static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema";
+ public static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema";
private final CompressionKind compressionKind;
private final int stripeSize;
private final int blockSize;
private final long maxFileSize;
- private final Configuration hadoopConf;
+ private final StorageConfiguration<?> storageConf;
private final BloomFilter bloomFilter;
- public HoodieOrcConfig(Configuration hadoopConf, CompressionKind
compressionKind, int stripeSize,
+ public HoodieOrcConfig(StorageConfiguration<?> storageConf, CompressionKind
compressionKind, int stripeSize,
int blockSize, long maxFileSize, BloomFilter bloomFilter) {
- this.hadoopConf = hadoopConf;
+ this.storageConf = storageConf;
this.compressionKind = compressionKind;
this.stripeSize = stripeSize;
this.blockSize = blockSize;
@@ -42,8 +43,8 @@ public class HoodieOrcConfig {
this.bloomFilter = bloomFilter;
}
- public Configuration getHadoopConf() {
- return hadoopConf;
+ public StorageConfiguration<?> getStorageConf() {
+ return storageConf;
}
public CompressionKind getCompressionKind() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java
index b5e567b7644..e17a017d679 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java
@@ -18,7 +18,8 @@
package org.apache.hudi.io.storage;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.storage.StorageConfiguration;
+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
/**
@@ -31,18 +32,18 @@ public class HoodieParquetConfig<T> {
private final int blockSize;
private final int pageSize;
private final long maxFileSize;
- private final Configuration hadoopConf;
+ private final StorageConfiguration<?> storageConf;
private final double compressionRatio;
private final boolean dictionaryEnabled;
- public HoodieParquetConfig(T writeSupport, CompressionCodecName
compressionCodecName, int blockSize,
- int pageSize, long maxFileSize, Configuration
hadoopConf, double compressionRatio, boolean dictionaryEnabled) {
+ public HoodieParquetConfig(T writeSupport, CompressionCodecName
compressionCodecName, int blockSize, int pageSize,
+ long maxFileSize, StorageConfiguration<?>
storageConf, double compressionRatio, boolean dictionaryEnabled) {
this.writeSupport = writeSupport;
this.compressionCodecName = compressionCodecName;
this.blockSize = blockSize;
this.pageSize = pageSize;
this.maxFileSize = maxFileSize;
- this.hadoopConf = hadoopConf;
+ this.storageConf = storageConf;
this.compressionRatio = compressionRatio;
this.dictionaryEnabled = dictionaryEnabled;
}
@@ -63,8 +64,8 @@ public class HoodieParquetConfig<T> {
return maxFileSize;
}
- public Configuration getHadoopConf() {
- return hadoopConf;
+ public StorageConfiguration<?> getStorageConf() {
+ return storageConf;
}
public double getCompressionRatio() {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 43002a723ef..01052d4b00f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -19,14 +19,12 @@
package org.apache.hudi.common.testutils.reader;
-import org.apache.hudi.avro.HoodieAvroWriteSupport;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.LocalTaskContextSupplier;
-import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -35,6 +33,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
@@ -44,12 +43,13 @@ import
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.io.storage.HoodieAvroFileWriter;
-import org.apache.hudi.io.storage.HoodieParquetConfig;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
@@ -57,7 +57,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -78,7 +77,6 @@ import static
org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
-import static
org.apache.hudi.io.storage.HoodieAvroFileWriterFactory.HOODIE_AVRO_PARQUET_WRITER;
public class HoodieFileSliceTestUtils {
public static final String FORWARD_SLASH = "/";
@@ -247,36 +245,31 @@ public class HoodieFileSliceTestUtils {
Schema schema,
String baseInstantTime
) throws IOException {
- Configuration hadoopConf = new Configuration();
+ StorageConfiguration<Configuration> conf =
HoodieTestUtils.getDefaultStorageConfWithDefaults();
// TODO: Optimize these hard-coded parameters for test purpose. (HUDI-7214)
- BloomFilter filter = BloomFilterFactory.createBloomFilter(
- 1000,
- 0.0001,
- 10000,
- BloomFilterTypeCode.DYNAMIC_V0.name());
- HoodieAvroWriteSupport<IndexedRecord> writeSupport = new
HoodieAvroWriteSupport<>(
- new AvroSchemaConverter().convert(schema),
- schema,
- Option.of(filter),
- new Properties());
- HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new
HoodieParquetConfig(
- writeSupport,
- CompressionCodecName.GZIP,
- ParquetWriter.DEFAULT_BLOCK_SIZE,
- ParquetWriter.DEFAULT_PAGE_SIZE,
- 1024 * 1024 * 1024,
- hadoopConf,
- 0.1,
- true);
-
- try (HoodieAvroFileWriter writer = (HoodieAvroFileWriter)
ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER,
- new Class<?>[] {StoragePath.class, HoodieParquetConfig.class,
String.class, TaskContextSupplier.class, boolean.class},
- new StoragePath(baseFilePath),
- parquetConfig,
- baseInstantTime,
- new LocalTaskContextSupplier(),
- true)) {
+ HoodieConfig cfg = new HoodieConfig();
+ //enable bloom filter
+ cfg.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true");
+ cfg.setValue(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED.key(),
"true");
+
+ //set bloom filter values
+ cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.key(),
String.valueOf(1000));
+ cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE.key(),
String.valueOf(0.00001));
+ cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(),
String.valueOf(10000));
+ cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(),
BloomFilterTypeCode.DYNAMIC_V0.name());
+
+ //set parquet config values
+ cfg.setValue(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(),
CompressionCodecName.GZIP.name());
+ cfg.setValue(HoodieStorageConfig.PARQUET_BLOCK_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
+ cfg.setValue(HoodieStorageConfig.PARQUET_PAGE_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
+ cfg.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(),
String.valueOf(1024 * 1024 * 1024));
+ cfg.setValue(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.key(),
String.valueOf(0.1));
+ cfg.setValue(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.key(), "true");
+
+ try (HoodieAvroFileWriter writer = (HoodieAvroFileWriter)
HoodieFileWriterFactory
+ .getFileWriter(baseInstantTime, new StoragePath(baseFilePath), conf,
cfg,
+ schema, new LocalTaskContextSupplier(),
HoodieRecord.HoodieRecordType.AVRO)) {
for (IndexedRecord record : records) {
writer.writeAvro(
(String) record.get(schema.getField(ROW_KEY).pos()), record);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index b2d7306d6ab..6eb6733b04b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -20,11 +20,13 @@
package org.apache.hudi.common.testutils.reader;
import org.apache.hudi.avro.model.HoodieDeleteRecord;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.util.ConfigUtils;
@@ -32,7 +34,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
@@ -78,7 +81,9 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
Schema requiredSchema,
StorageConfiguration<?> conf
) throws IOException {
- HoodieAvroParquetReader reader = new HoodieAvroParquetReader(conf, new
StoragePath(filePath.toUri()));
+ HoodieAvroFileReader reader = (HoodieAvroFileReader)
HoodieFileReaderFactory
+
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new
HoodieConfig(),
+ conf, filePath, HoodieFileFormat.PARQUET, Option.empty());
return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java
index a0ec0dfdb89..2fc38c156a3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java
@@ -44,7 +44,7 @@ import static
org.apache.hudi.io.hfile.TestHFileReader.DUMMY_BLOOM_FILTER;
* Utils for reader and writer tests.
*/
public class TestHoodieReaderWriterUtils {
- static void writeHFileForTesting(String fileLocation,
+ public static void writeHFileForTesting(String fileLocation,
int blockSize,
Compression.Algorithm compressionAlgo,
int numEntries,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
similarity index 81%
rename from
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
index 6a6b0b67aa5..3a4d0b910ab 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
@@ -7,19 +7,25 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader;
+import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
similarity index 80%
rename from
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
index 2a727158e17..d0b8faa7589 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
@@ -27,6 +28,11 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieOrcConfig;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -40,18 +46,19 @@ import
org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Properties;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.CACHE_DATA_IN_L1;
-import static
org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
+import static org.apache.hudi.io.hadoop.HoodieHFileConfig.CACHE_DATA_IN_L1;
+import static
org.apache.hudi.io.hadoop.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
+import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR;
+import static org.apache.hudi.io.hadoop.HoodieHFileConfig.PREFETCH_ON_OPEN;
public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
//hardcoded classes to remove at a later time
- public static final String HOODIE_AVRO_PARQUET_WRITER =
"org.apache.hudi.io.storage.HoodieAvroParquetWriter";
- public static final String HOODIE_AVRO_HFILE_WRITER =
"org.apache.hudi.io.storage.HoodieAvroHFileWriter";
- public static final String HOODIE_AVRO_ORC_WRITER =
"org.apache.hudi.io.storage.HoodieAvroOrcWriter";
+ public static final String HOODIE_AVRO_PARQUET_WRITER =
"org.apache.hudi.io.hadoop.HoodieAvroParquetWriter";
+ public static final String HOODIE_AVRO_HFILE_WRITER =
"org.apache.hudi.io.hadoop.HoodieAvroHFileWriter";
+ public static final String HOODIE_AVRO_ORC_WRITER =
"org.apache.hudi.io.hadoop.HoodieAvroOrcWriter";
@Override
protected HoodieFileWriter newParquetFileWriter(
@@ -70,7 +77,7 @@ public class HoodieAvroFileWriterFactory extends
HoodieFileWriterFactory {
config.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE),
config.getIntOrDefault(HoodieStorageConfig.PARQUET_PAGE_SIZE),
config.getLongOrDefault(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE),
- conf.unwrapAs(Configuration.class),
config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
+ conf,
config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED));
try {
return (HoodieFileWriter)
ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER,
@@ -83,16 +90,16 @@ public class HoodieAvroFileWriterFactory extends
HoodieFileWriterFactory {
}
protected HoodieFileWriter newParquetFileWriter(
- FSDataOutputStream outputStream, StorageConfiguration<?> conf,
HoodieConfig config, Schema schema) throws IOException {
+ OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig
config, Schema schema) throws IOException {
HoodieAvroWriteSupport writeSupport = getHoodieAvroWriteSupport(conf,
schema, config, false);
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new
HoodieParquetConfig<>(writeSupport,
CompressionCodecName.fromConf(config.getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME)),
config.getInt(HoodieStorageConfig.PARQUET_BLOCK_SIZE),
config.getInt(HoodieStorageConfig.PARQUET_PAGE_SIZE),
config.getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE), // todo:
1024*1024*1024
- conf.unwrapAs(Configuration.class),
config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
+ conf,
config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
config.getBoolean(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED));
- return new HoodieParquetStreamWriter(outputStream, parquetConfig);
+ return new HoodieParquetStreamWriter(new FSDataOutputStream(outputStream,
null), parquetConfig);
}
protected HoodieFileWriter newHFileFileWriter(
@@ -120,7 +127,7 @@ public class HoodieAvroFileWriterFactory extends
HoodieFileWriterFactory {
String instantTime, StoragePath path, StorageConfiguration<?> conf,
HoodieConfig config, Schema schema,
TaskContextSupplier taskContextSupplier) throws IOException {
BloomFilter filter = createBloomFilter(config);
- HoodieOrcConfig orcConfig = new
HoodieOrcConfig(conf.unwrapAs(Configuration.class),
+ HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf,
CompressionKind.valueOf(config.getString(HoodieStorageConfig.ORC_COMPRESSION_CODEC_NAME)),
config.getInt(HoodieStorageConfig.ORC_STRIPE_SIZE),
config.getInt(HoodieStorageConfig.ORC_BLOCK_SIZE),
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
similarity index 93%
rename from
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
index 6de6b24868b..379df6e97b9 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
@@ -26,6 +27,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
similarity index 86%
rename from
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
index 4d90590d953..c1f5b79c227 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -26,6 +27,8 @@ import org.apache.hudi.common.util.OrcReaderIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -47,7 +50,7 @@ import java.util.Set;
/**
* {@link HoodieFileReader} implementation for ORC format.
*/
-public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase {
+public class HoodieAvroOrcReader extends HoodieAvroFileReader {
private final StoragePath path;
private final StorageConfiguration<?> conf;
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
similarity index 91%
rename from
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
index 07e7bc7f122..40e37fa145f 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
@@ -27,6 +28,8 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
@@ -70,7 +73,7 @@ public class HoodieAvroOrcWriter implements
HoodieAvroFileWriter, Closeable {
public HoodieAvroOrcWriter(String instantTime, StoragePath file,
HoodieOrcConfig config, Schema schema,
TaskContextSupplier taskContextSupplier) throws
IOException {
- Configuration conf = HadoopFSUtils.registerFileSystem(file,
config.getHadoopConf());
+ Configuration conf = HadoopFSUtils.registerFileSystem(file,
config.getStorageConf().unwrapAs(Configuration.class));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
this.instantTime = instantTime;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
similarity index 93%
rename from
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index 66ff6b483e0..d75660a9a7e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
@@ -29,6 +30,8 @@ import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -52,7 +55,7 @@ import static
org.apache.hudi.common.util.TypeUtils.unsafeCast;
/**
* {@link HoodieFileReader} implementation for parquet format.
*/
-public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase {
+public class HoodieAvroParquetReader extends HoodieAvroFileReader {
private final StoragePath path;
private final StorageConfiguration<?> conf;
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java
similarity index 84%
rename from
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java
index 4269e6513a2..f8f9a8ccea0 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java
@@ -7,20 +7,23 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.generic.IndexedRecord;
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java
similarity index 90%
rename from
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java
index 06f1e513055..8f17fa0fa1e 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java
@@ -7,20 +7,22 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
import org.apache.hadoop.conf.Configuration;
@@ -52,8 +54,9 @@ public abstract class HoodieBaseParquetWriter<R> implements
Closeable {
public HoodieBaseParquetWriter(StoragePath file,
HoodieParquetConfig<? extends
WriteSupport<R>> parquetConfig) throws IOException {
+ Configuration hadoopConf =
parquetConfig.getStorageConf().unwrapAs(Configuration.class);
ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder(
- HoodieWrapperFileSystem.convertToHoodiePath(file,
parquetConfig.getHadoopConf())) {
+ HoodieWrapperFileSystem.convertToHoodiePath(file, hadoopConf)) {
@Override
protected ParquetWriter.Builder self() {
return this;
@@ -73,8 +76,8 @@ public abstract class HoodieBaseParquetWriter<R> implements
Closeable {
parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled());
parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED);
parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION);
- parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file,
parquetConfig.getHadoopConf()));
- handleParquetBloomFilters(parquetWriterbuilder,
parquetConfig.getHadoopConf());
+ parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file,
hadoopConf));
+ handleParquetBloomFilters(parquetWriterbuilder, hadoopConf);
parquetWriter = parquetWriterbuilder.build();
// We cannot accurately measure the snappy compressed output file size. We
are choosing a
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
similarity index 87%
rename from
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
index 64cc607ef63..83b659a6be0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
@@ -7,18 +7,20 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java
similarity index 84%
rename from
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java
index 226266bf6cf..5fdd6505733 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java
@@ -7,19 +7,22 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
import org.apache.avro.generic.IndexedRecord;
@@ -54,7 +57,7 @@ public class HoodieParquetStreamWriter implements
HoodieAvroFileWriter, AutoClos
.withDictionaryPageSize(parquetConfig.getPageSize())
.withDictionaryEncoding(parquetConfig.dictionaryEnabled())
.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION)
- .withConf(parquetConfig.getHadoopConf())
+ .withConf(parquetConfig.getStorageConf().unwrapAs(Configuration.class))
.build();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
similarity index 100%
rename from
hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
similarity index 83%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
index 96b8ea9e6b3..7faf84a1ee5 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
@@ -7,19 +7,22 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
similarity index 86%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
index f9909b0f5f2..82a80b1ce26 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
@@ -7,28 +7,31 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -83,7 +86,7 @@ public class TestHoodieBaseParquetWriter {
public void testCanWrite() throws IOException {
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001,
10000,
BloomFilterTypeCode.DYNAMIC_V0.name());
- Configuration hadoopConf = new Configuration();
+ StorageConfiguration conf =
HoodieTestUtils.getDefaultStorageConfWithDefaults();
Schema schema = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new
AvroSchemaConverter().convert(schema),
@@ -92,7 +95,7 @@ public class TestHoodieBaseParquetWriter {
long maxFileSize = 2 * 1024 * 1024;
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE,
- ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, hadoopConf, 0, true);
+ ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, conf, 0, true);
StoragePath filePath = new StoragePath(
new StoragePath(tempDir.toUri()), "test_fileSize.parquet");
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
similarity index 93%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
index 48fa1ddc501..8c227b88e0f 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
@@ -7,19 +7,24 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader;
+import org.apache.hudi.io.storage.HoodieHFileUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
similarity index 85%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index 6fe0e2ffea5..b87af2c8371 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -7,19 +7,23 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.avro.Schema;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
similarity index 98%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
index dcd791956c5..3bdd561a282 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.HoodieStorageConfig;
@@ -29,6 +29,9 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
@@ -75,7 +78,7 @@ import static
org.apache.hudi.io.hfile.TestHFileReader.BOOTSTRAP_INDEX_HFILE_SUF
import static
org.apache.hudi.io.hfile.TestHFileReader.COMPLEX_SCHEMA_HFILE_SUFFIX;
import static
org.apache.hudi.io.hfile.TestHFileReader.SIMPLE_SCHEMA_HFILE_SUFFIX;
import static org.apache.hudi.io.hfile.TestHFileReader.readHFileFromResources;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
+import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
similarity index 87%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
index bc719be8bc8..6a94a32ed3c 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
@@ -25,6 +26,10 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -57,7 +62,7 @@ public class TestHoodieOrcReaderWriter extends
TestHoodieReaderWriterBase {
protected HoodieAvroOrcWriter createWriter(
Schema avroSchema, boolean populateMetaFields) throws Exception {
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001,
-1, BloomFilterTypeCode.SIMPLE.name());
- Configuration conf = new Configuration();
+ StorageConfiguration conf =
HoodieTestUtils.getDefaultStorageConfWithDefaults();
int orcStripSize =
Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue());
int orcBlockSize =
Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue());
int maxFileSize =
Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue());
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java
similarity index 97%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java
index 226cf10f97e..3fd0ad80319 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -26,6 +26,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index 4d925d3d4ed..791435f4bb7 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -24,12 +24,13 @@ import org.apache.hudi.common.config.HoodieStorageConfig
import
org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES,
BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.{BaseFileUtils, Option}
-import org.apache.hudi.io.storage.{HoodieAvroParquetWriter,
HoodieParquetConfig}
+import org.apache.hudi.io.storage.HoodieParquetConfig
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration,
StoragePath}
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -61,12 +62,12 @@ object SparkHelpers {
HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt,
HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt,
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt,
- conf.unwrap(),
+ conf,
HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble,
HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.defaultValue)
// Add current classLoad for config, if not will throw classNotFound of
'HoodieWrapperFileSystem'.
-
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
+ conf.unwrap().setClassLoader(Thread.currentThread.getContextClassLoader)
val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig,
instantTime, new SparkTaskContextSupplier(), true)
for (rec <- sourceRecords) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index d39be52dd22..225cab39286 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -55,7 +55,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.index.HoodieIndex.IndexType;
-import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.storage.StoragePath;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
index 65d140da8b3..95f151336c7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
@@ -89,7 +90,7 @@ public class TestHoodieInternalRowParquetWriter extends
HoodieSparkClientTestHar
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new
HoodieParquetConfig<>(writeSupport,
CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(),
cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
- writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(),
cfg.parquetDictionaryEnabled());
+ new HadoopStorageConfiguration(writeSupport.getHadoopConf()),
cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled());
StoragePath filePath = new StoragePath(basePath +
"/internal_row_writer.parquet");