This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 68fd850ec97062fa68b7b52e499903ef652fb1db Author: Y Ethan Guo <[email protected]> AuthorDate: Wed May 15 07:59:46 2024 -0700 [HUDI-7752] Abstract serializeRecords for log writing (#11210) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 11 +- .../org/apache/hudi/index/HoodieIndexUtils.java | 1 - .../hudi/io/HoodieKeyLocationFetchHandle.java | 8 +- .../row/HoodieRowDataFileWriterFactory.java | 3 +- .../client/TestHoodieJavaWriteClientInsert.java | 6 +- .../TestHoodieJavaClientOnCopyOnWriteStorage.java | 4 +- .../commit/TestJavaCopyOnWriteActionExecutor.java | 6 +- .../testutils/HoodieJavaClientTestHarness.java | 8 +- .../hudi/io/storage/HoodieSparkParquetReader.java | 18 +-- .../row/HoodieInternalRowFileWriterFactory.java | 3 +- .../hudi/client/TestUpdateSchemaEvolution.java | 4 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 14 +-- .../commit/TestCopyOnWriteActionExecutor.java | 8 +- .../hudi/common/model/HoodiePartitionMetadata.java | 6 +- .../hudi/common/table/TableSchemaResolver.java | 6 +- .../common/table/log/block/HoodieDataBlock.java | 6 +- .../table/log/block/HoodieHFileDataBlock.java | 109 +++--------------- .../table/log/block/HoodieParquetDataBlock.java | 54 ++------- .../table/timeline/HoodieArchivedTimeline.java | 2 - .../{BaseFileUtils.java => FileFormatUtils.java} | 29 ++++- .../hudi/metadata/HoodieTableMetadataUtil.java | 4 +- .../hudi/sink/bootstrap/BootstrapOperator.java | 4 +- .../org/apache/hudi/common/util/HFileUtils.java | 122 ++++++++++++++++++++- .../java/org/apache/hudi/common/util/OrcUtils.java | 11 +- .../org/apache/hudi/common/util/ParquetUtils.java | 53 ++++++++- .../apache/hudi/io/hadoop/HoodieAvroOrcReader.java | 6 +- .../hudi/io/hadoop/HoodieAvroParquetReader.java | 6 +- .../common/functional/TestHoodieLogFormat.java | 8 +- .../apache/hudi/common/util/TestHFileUtils.java | 59 ++++++++++ .../hudi/hadoop/testutils/InputFormatTestUtil.java | 9 +- .../org/apache/spark/sql/hudi/SparkHelpers.scala | 8 +- .../org/apache/hudi/ColumnStatsIndexHelper.java | 4 +- .../utilities/HoodieMetadataTableValidator.java | 6 +- 33 files changed, 374 insertions(+), 232 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2d01f13b1db..c4b5be318ba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -78,9 +78,7 @@ import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.hudi.table.storage.HoodieStorageLayout; -import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.orc.CompressionKind; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2068,9 +2066,8 @@ public class HoodieWriteConfig extends HoodieConfig { return getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION); } - public CompressionCodecName getParquetCompressionCodec() { - String codecName = getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); - return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) ? null : codecName); + public String getParquetCompressionCodec() { + return getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); } public boolean parquetDictionaryEnabled() { @@ -2114,8 +2111,8 @@ public class HoodieWriteConfig extends HoodieConfig { return getInt(HoodieStorageConfig.HFILE_BLOCK_SIZE); } - public Compression.Algorithm getHFileCompressionAlgorithm() { - return Compression.Algorithm.valueOf(getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME)); + public String getHFileCompressionAlgorithm() { + return getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME); } public long getOrcMaxFileSize() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index e4d0269a3e6..e7734877198 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -47,7 +47,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.io.HoodieMergedReadHandle; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java index e397d07fcf6..9db4101cfcb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -50,11 +50,11 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> extends HoodieReadHandle<T } private List<HoodieKey> fetchHoodieKeys(HoodieBaseFile baseFile) { - BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getStoragePath()); + FileFormatUtils fileFormatUtils = FileFormatUtils.getInstance(baseFile.getStoragePath()); if (keyGeneratorOpt.isPresent()) { - return baseFileUtils.fetchHoodieKeys(hoodieTable.getStorageConf(), baseFile.getStoragePath(), keyGeneratorOpt); + return fileFormatUtils.fetchHoodieKeys(hoodieTable.getStorageConf(), baseFile.getStoragePath(), keyGeneratorOpt); } else { - return baseFileUtils.fetchHoodieKeys(hoodieTable.getStorageConf(), baseFile.getStoragePath()); + return fileFormatUtils.fetchHoodieKeys(hoodieTable.getStorageConf(), baseFile.getStoragePath()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index be757a30954..8d2a87a5110 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; +import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName; import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; /** @@ -73,7 +74,7 @@ public class HoodieRowDataFileWriterFactory { return new HoodieRowDataParquetWriter( convertToStoragePath(path), new HoodieParquetConfig<>( writeSupport, - writeConfig.getParquetCompressionCodec(), + getCompressionCodecName(writeConfig.getParquetCompressionCodec()), writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java index 1c877fbf621..718203561c7 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java @@ -31,7 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -147,7 +147,7 @@ public class TestHoodieJavaWriteClientInsert extends HoodieJavaClientTestHarness HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); metaClient = HoodieTableMetaClient.reload(metaClient); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient); // Get some records belong to the same partition (2021/09/11) String insertRecordStr1 = "{\"_row_key\":\"1\"," @@ -221,7 +221,7 @@ public class TestHoodieJavaWriteClientInsert extends HoodieJavaClientTestHarness HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); metaClient = HoodieTableMetaClient.reload(metaClient); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient); String partitionPath = "2021/09/11"; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionPath}); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java index 30b07d52d50..6f5352e2a34 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java @@ -63,9 +63,9 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; @@ -1028,7 +1028,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage extends HoodieJavaClientTe private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, List<GenericRecord> records) { for (WriteStatus status : allStatus) { StoragePath filePath = new StoragePath(basePath, status.getStat().getPath()); - records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(storageConf, filePath)); + records.addAll(FileFormatUtils.getInstance(metaClient).readAvroRecords(storageConf, filePath)); } Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords); assertEquals(records.size(), expectedKeys.size()); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java index 30ebbef8b44..d14c2a30921 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -34,7 +34,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.testutils.Transformations; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -131,7 +131,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestHarne HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient); String partitionPath = "2016/01/31"; @@ -480,7 +480,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestHarne HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient); String partitionPath = "2022/04/09"; diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index da8404a66f0..430f8f01a5e 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -50,7 +50,7 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -908,7 +908,7 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest HashMap<String, String> paths = getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant)); return paths.values().stream().map(StoragePath::new).flatMap(path -> - BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream()) + FileFormatUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream()) .filter(record -> { if (filterByCommitTime) { Object commitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD); @@ -937,7 +937,7 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest try { List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage, paths); return latestFiles.stream().mapToLong(baseFile -> - BaseFileUtils.getInstance(baseFile.getStoragePath()) + FileFormatUtils.getInstance(baseFile.getStoragePath()) .readAvroRecords(context.getStorageConf(), baseFile.getStoragePath()).size()) .sum(); } catch (Exception e) { @@ -975,7 +975,7 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn); String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]); if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream()) + return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> FileFormatUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream()) .filter(record -> { if (lastCommitTimeOpt.isPresent()) { Object commitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index e2b7e91d932..8bbf7840d5b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -18,24 +18,24 @@ package org.apache.hudi.io.storage; -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.BaseFileUtils; -import org.apache.hudi.common.util.collection.ClosableIterator; -import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.model.HoodieSparkRecord; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.schema.MessageType; @@ -60,7 +60,7 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader { private final StoragePath path; private final StorageConfiguration<?> conf; - private final BaseFileUtils parquetUtils; + private final FileFormatUtils parquetUtils; private List<ParquetReaderIterator> readerIterators = new ArrayList<>(); public HoodieSparkParquetReader(StorageConfiguration<?> conf, StoragePath path) { @@ -68,7 +68,7 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader { this.conf = conf.newInstance(); // Avoid adding record in list element when convert parquet schema to avro schema conf.set(ADD_LIST_ELEMENT_RECORDS, "false"); - this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java index 8e7287a7024..7ebcd1f39ff 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java @@ -34,6 +34,7 @@ import org.apache.spark.sql.types.StructType; import java.io.IOException; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; +import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName; /** * Factory to assist in instantiating a new {@link HoodieInternalRowFileWriter}. @@ -76,7 +77,7 @@ public class HoodieInternalRowFileWriterFactory { path, new HoodieParquetConfig<>( writeSupport, - writeConfig.getParquetCompressionCodec(), + getCompressionCodecName(writeConfig.getParquetCompressionCodec()), writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index 5e50e5ea891..26f3e193469 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -27,7 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; @@ -132,7 +132,7 @@ public class TestUpdateSchemaEvolution extends HoodieSparkClientTestHarness impl Executable executable = () -> { HoodieMergeHandle mergeHandle = new HoodieMergeHandle(updateTable.getConfig(), "101", updateTable, updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), insertResult.getFileId(), supplier, Option.empty()); - List<GenericRecord> oldRecords = BaseFileUtils.getInstance(updateTable.getBaseFileFormat()) + List<GenericRecord> oldRecords = FileFormatUtils.getInstance(updateTable.getBaseFileFormat()) .readAvroRecords(updateTable.getStorageConf(), new StoragePath(updateTable.getConfig().getBasePath() + "/" + insertResult.getStat().getPath()), mergeHandle.getWriterSchemaWithMetaFields()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index f57e8d41ceb..0db85ae69c1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -75,9 +75,9 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; @@ -1197,7 +1197,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient); // Inserts => will write file1 String commitTime1 = "001"; @@ -1310,7 +1310,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + FileFormatUtils fileUtils = FileFormatUtils.getInstance(metaClient); // Inserts => will write file1 String commitTime1 = "001"; @@ -1408,7 +1408,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, - BaseFileUtils.getInstance(metaClient).readRowKeys(storageConf, new StoragePath(basePath, statuses.get(0).getStat().getPath())) + FileFormatUtils.getInstance(metaClient).readRowKeys(storageConf, new StoragePath(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); // Delete 20 among 100 inserted @@ -2090,7 +2090,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, List<GenericRecord> records) { for (WriteStatus status : allStatus) { StoragePath filePath = new StoragePath(basePath, status.getStat().getPath()); - records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(storageConf, filePath)); + records.addAll(FileFormatUtils.getInstance(metaClient).readAvroRecords(storageConf, filePath)); } Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords); assertEquals(records.size(), expectedKeys.size()); @@ -2179,10 +2179,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { StoragePath newFile = new StoragePath(basePath, statuses.get(0).getStat().getPath()); assertEquals(expectedRecords, - BaseFileUtils.getInstance(metaClient).readRowKeys(storageConf, newFile).size(), + FileFormatUtils.getInstance(metaClient).readRowKeys(storageConf, newFile).size(), "file should contain 110 records"); - List<GenericRecord> records = BaseFileUtils.getInstance(metaClient).readAvroRecords(storageConf, newFile); + List<GenericRecord> records = FileFormatUtils.getInstance(metaClient).readAvroRecords(storageConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 594036be5b1..c71a0ca85fb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -36,7 +36,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.testutils.Transformations; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; @@ -205,14 +205,14 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase implemen // Read out the bloom filter and make sure filter can answer record exist or not Path filePath = allFiles[0].getPath(); - BloomFilter filter = BaseFileUtils.getInstance(table.getBaseFileFormat()) + BloomFilter filter = FileFormatUtils.getInstance(table.getBaseFileFormat()) .readBloomFilterFromMetadata(storageConf, new StoragePath(filePath.toUri())); for (HoodieRecord record : records) { assertTrue(filter.mightContain(record.getRecordKey())); } // Read the base file, check the record content - List<GenericRecord> fileRecords = BaseFileUtils.getInstance(table.getBaseFileFormat()) + List<GenericRecord> fileRecords = FileFormatUtils.getInstance(table.getBaseFileFormat()) .readAvroRecords(storageConf, new StoragePath(filePath.toUri())); GenericRecord newRecord; int index = 0; @@ -248,7 +248,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase implemen // Check whether the record has been updated Path updatedFilePath = allFiles[0].getPath(); BloomFilter updatedFilter = - BaseFileUtils.getInstance(metaClient).readBloomFilterFromMetadata(storageConf, new StoragePath(updatedFilePath.toUri())); + FileFormatUtils.getInstance(metaClient).readBloomFilterFromMetadata(storageConf, new StoragePath(updatedFilePath.toUri())); for (HoodieRecord record : records) { // No change to the _row_key assertTrue(updatedFilter.mightContain(record.getRecordKey())); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java index e8edc8b9142..5d75414c6ff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java @@ -18,7 +18,7 @@ package org.apache.hudi.common.model; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.RetryHelper; import org.apache.hudi.common.util.StringUtils; @@ -137,7 +137,7 @@ public class HoodiePartitionMetadata { HOODIE_PARTITION_METAFILE_PREFIX + "_" + UUID.randomUUID() + getMetafileExtension()); try { // write to temporary file - BaseFileUtils.getInstance(format).writeMetaFile(storage, tmpPath, props); + FileFormatUtils.getInstance(format).writeMetaFile(storage, tmpPath, props); // move to actual path storage.rename(tmpPath, filePath); } finally { @@ -185,7 +185,7 @@ public class HoodiePartitionMetadata { private boolean readBaseFormatMetaFile() { for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) { try { - BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath); + FileFormatUtils reader = FileFormatUtils.getInstance(metafilePath); // Data file format Map<String, String> metadata = reader.readFooter( storage.getConf(), true, metafilePath, PARTITION_DEPTH_KEY, COMMIT_TIME_KEY); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 278692dbf5b..d0a395c83a0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -32,7 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -302,7 +302,7 @@ public class TableSchemaResolver { .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); StoragePath path = new StoragePath(filePath); - return BaseFileUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(), path); + return FileFormatUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(), path); } private Schema readSchemaFromLogFile(StoragePath path) throws IOException { @@ -469,7 +469,7 @@ public class TableSchemaResolver { // this is a log file schema = readSchemaFromLogFile(filePath); } else { - schema = BaseFileUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(), filePath); + schema = FileFormatUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(), filePath); } } return schema; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 6d75ce40355..0b1fcc6dc02 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -20,8 +20,6 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; -import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; -import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; @@ -119,6 +117,10 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { return serializeRecords(records.get(), storageConf); } + public String getKeyFieldName() { + return keyFieldName; + } + protected static Schema getWriterSchema(Map<HeaderMetadataType, String> logBlockHeader) { return new Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 356bab33bd0..d6fbb52fc7e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -24,12 +24,10 @@ import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; import org.apache.hudi.io.storage.HoodieFileReader; @@ -43,28 +41,17 @@ import org.apache.hudi.storage.inline.InLineFSUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Properties; -import java.util.TreeMap; import java.util.function.Supplier; -import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; +import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME; import static org.apache.hudi.common.util.TypeUtils.unsafeCast; import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME; @@ -75,10 +62,8 @@ import static org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase.KEY_FIELD */ public class HoodieHFileDataBlock extends HoodieDataBlock { private static final Logger LOG = LoggerFactory.getLogger(HoodieHFileDataBlock.class); - private static final int DEFAULT_BLOCK_SIZE = 1024 * 1024; - private static final String KV_COMPARATOR_CLASS_NAME = "org.apache.hudi.io.storage.HoodieHBaseKVComparator"; - private final Option<Compression.Algorithm> compressionAlgorithm; + private final Option<String> compressionCodec; // This path is used for constructing HFile reader context, which should not be // interpreted as the actual file path for the HFile data blocks private final StoragePath pathForReader; @@ -95,19 +80,19 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { StoragePath pathForReader, boolean useNativeHFileReader) { super(content, inputStreamSupplier, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, - header, footer, KEY_FIELD_NAME, enablePointLookups); - this.compressionAlgorithm = Option.empty(); + header, footer, HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME, enablePointLookups); + this.compressionCodec = Option.empty(); this.pathForReader = pathForReader; this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader); } public HoodieHFileDataBlock(List<HoodieRecord> records, Map<HeaderMetadataType, String> header, - Compression.Algorithm compressionAlgorithm, + String compressionCodec, StoragePath pathForReader, boolean useNativeHFileReader) { super(records, header, new HashMap<>(), KEY_FIELD_NAME); - this.compressionAlgorithm = Option.of(compressionAlgorithm); + this.compressionCodec = Option.of(compressionCodec); this.pathForReader = pathForReader; this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader); } @@ -119,70 +104,11 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { @Override protected byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException { - HFileContext context = new HFileContextBuilder() - .withBlockSize(DEFAULT_BLOCK_SIZE) - .withCompression(compressionAlgorithm.get()) - .withCellComparator(ReflectionUtils.loadClass(KV_COMPARATOR_CLASS_NAME)) - .build(); - - Configuration conf = storageConf.unwrapAs(Configuration.class); - CacheConfig cacheConfig = new CacheConfig(conf); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - FSDataOutputStream ostream = new FSDataOutputStream(baos, null); - - // Use simple incrementing counter as a key - boolean useIntegerKey = !getRecordKey(records.get(0)).isPresent(); - // This is set here to avoid re-computing this in the loop - int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 1 : -1; - - // Serialize records into bytes - Map<String, byte[]> sortedRecordsMap = new TreeMap<>(); - // Get writer schema - Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - - Iterator<HoodieRecord> itr = records.iterator(); - int id = 0; - while (itr.hasNext()) { - HoodieRecord<?> record = itr.next(); - String recordKey; - if (useIntegerKey) { - recordKey = String.format("%" + keyWidth + "s", id++); - } else { - recordKey = getRecordKey(record).get(); - } - - final byte[] recordBytes = serializeRecord(record, writerSchema); - if (sortedRecordsMap.containsKey(recordKey)) { - LOG.error("Found duplicate record with recordKey: " + recordKey); - printRecord("Previous record", sortedRecordsMap.get(recordKey), writerSchema); - printRecord("Current record", recordBytes, writerSchema); - throw new HoodieException(String.format("Writing multiple records with same key %s not supported for %s", - recordKey, this.getClass().getName())); - } - sortedRecordsMap.put(recordKey, recordBytes); - } - - HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig) - .withOutputStream(ostream).withFileContext(context).create(); - - // Write the records - sortedRecordsMap.forEach((recordKey, recordBytes) -> { - try { - KeyValue kv = new KeyValue(getUTF8Bytes(recordKey), null, null, recordBytes); - writer.append(kv); - } catch (IOException e) { - throw new HoodieIOException("IOException serializing records", e); - } - }); - - writer.appendFileInfo( - getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY), getUTF8Bytes(getSchema().toString())); - - writer.close(); - ostream.flush(); - ostream.close(); - - return baos.toByteArray(); + Schema writerSchema = new Schema.Parser().parse( + super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA)); + return FileFormatUtils.getInstance(HoodieFileFormat.HFILE).serializeRecordsToLogBlock( + storageConf, records, writerSchema, getSchema(), getKeyFieldName(), + Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), compressionCodec.get())); } @Override @@ -226,15 +152,6 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { } } - private byte[] serializeRecord(HoodieRecord<?> record, Schema schema) throws IOException { - Option<Schema.Field> keyField = getKeyField(schema); - // Reset key value w/in the record to avoid duplicating the key w/in payload - if (keyField.isPresent()) { - record.truncateRecordKey(schema, new Properties(), keyField.get().name()); - } - return HoodieAvroUtils.recordToBytes(record, schema).get(); - } - /** * Print the record in json format */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index e370b156be8..b94b92a942a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -18,37 +18,28 @@ package org.apache.hudi.common.table.log.block; -import org.apache.hudi.common.config.HoodieConfig; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.io.SeekableDataInputStream; -import org.apache.hudi.io.storage.HoodieFileWriter; -import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.inline.InLineFSUtils; import org.apache.avro.Schema; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Supplier; -import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_BLOCK_SIZE; import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME; import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION; import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED; -import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE; -import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_PAGE_SIZE; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; @@ -57,7 +48,7 @@ import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_RE */ public class HoodieParquetDataBlock extends HoodieDataBlock { - private final Option<CompressionCodecName> compressionCodecName; + private final Option<String> compressionCodecName; private final Option<Double> expectedCompressionRatio; private final Option<Boolean> useDictionaryEncoding; @@ -79,7 +70,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { public HoodieParquetDataBlock(List<HoodieRecord> records, Map<HeaderMetadataType, String> header, String keyField, - CompressionCodecName compressionCodecName, + String compressionCodecName, double expectedCompressionRatio, boolean useDictionaryEncoding ) { @@ -97,36 +88,15 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { @Override protected byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException { - if (records.size() == 0) { - return new byte[0]; - } - - Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - HoodieConfig config = new HoodieConfig(); - config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name()); - config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); - config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); - config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); - config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); - config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get())); - HoodieRecordType recordType = records.iterator().next().getRecordType(); - HoodieFileWriter parquetWriter = null; - try { - parquetWriter = HoodieFileWriterFactory.getFileWriter( - HoodieFileFormat.PARQUET, outputStream, storageConf, - config, writerSchema, recordType); - for (HoodieRecord<?> record : records) { - String recordKey = getRecordKey(record).orElse(null); - parquetWriter.write(recordKey, record, writerSchema); - } - outputStream.flush(); - } finally { - if (parquetWriter != null) { - parquetWriter.close(); - } - } - return outputStream.toByteArray(); + Map<String, String> paramsMap = new HashMap<>(); + paramsMap.put(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get()); + paramsMap.put(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); + paramsMap.put(PARQUET_DICTIONARY_ENABLED.key(), String.valueOf(useDictionaryEncoding.get())); + Schema writerSchema = new Schema.Parser().parse( + super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA)); + + return FileFormatUtils.getInstance(PARQUET).serializeRecordsToLogBlock( + storageConf, records, writerSchema, getSchema(), getKeyFieldName(), paramsMap); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 8914fa5249b..587fd31866e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -35,8 +35,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieAvroFileReader; -import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java similarity index 90% rename from hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java index 8fb224dddaa..d5620fdcf65 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.BaseKeyGenerator; @@ -44,14 +45,14 @@ import java.util.Properties; import java.util.Set; /** - * Utils for Hudi base file. + * Utils for file format used in Hudi. */ -public abstract class BaseFileUtils { +public abstract class FileFormatUtils { public static final String PARQUET_UTILS = "org.apache.hudi.common.util.ParquetUtils"; public static final String ORC_UTILS = "org.apache.hudi.common.util.OrcUtils"; public static final String HFILE_UTILS = "org.apache.hudi.common.util.HFileUtils"; - public static BaseFileUtils getInstance(StoragePath path) { + public static FileFormatUtils getInstance(StoragePath path) { if (path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) { return ReflectionUtils.loadClass(PARQUET_UTILS); } else if (path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) { @@ -62,7 +63,7 @@ public abstract class BaseFileUtils { throw new UnsupportedOperationException("The format for file " + path + " is not supported yet."); } - public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) { + public static FileFormatUtils getInstance(HoodieFileFormat fileFormat) { if (HoodieFileFormat.PARQUET.equals(fileFormat)) { return ReflectionUtils.loadClass(PARQUET_UTILS); } else if (HoodieFileFormat.ORC.equals(fileFormat)) { @@ -73,7 +74,7 @@ public abstract class BaseFileUtils { throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); } - public static BaseFileUtils getInstance(HoodieTableMetaClient metaClient) { + public static FileFormatUtils getInstance(HoodieTableMetaClient metaClient) { return getInstance(metaClient.getTableConfig().getBaseFileFormat()); } @@ -268,4 +269,22 @@ public abstract class BaseFileUtils { public abstract void writeMetaFile(HoodieStorage storage, StoragePath filePath, Properties props) throws IOException; + + /** + * Serializes Hudi records to the log block. + * + * @param storageConf storage configuration. + * @param records a list of {@link HoodieRecord}. + * @param writerSchema writer schema string from the log block header. + * @param readerSchema + * @param keyFieldName + * @param paramsMap additional params for serialization. + * @return byte array after serialization. + * @throws IOException upon serialization error. + */ + public abstract byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf, + List<HoodieRecord> records, + Schema writerSchema, + Schema readerSchema, String keyFieldName, + Map<String, String> paramsMap) throws IOException; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index cf5e4b27dd7..edf0d1bc33d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -56,9 +56,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.ExternalFilePathUtil; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; @@ -1175,7 +1175,7 @@ public class HoodieTableMetadataUtil { try { if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { StoragePath fullFilePath = new StoragePath(datasetMetaClient.getBasePathV2(), filePath); - return BaseFileUtils.getInstance(HoodieFileFormat.PARQUET) + return FileFormatUtils.getInstance(HoodieFileFormat.PARQUET) .readColumnStatsFromMetadata(datasetMetaClient.getStorageConf(), fullFilePath, columnsToIndex); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 54f302a85fb..d98470e6444 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -30,7 +30,7 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ClosableIterator; @@ -200,7 +200,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>> Option<HoodieInstant> latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant(); if (latestCommitTime.isPresent()) { - BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat()); + FileFormatUtils fileUtils = FileFormatUtils.getInstance(this.hoodieTable.getBaseFileFormat()); Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema(); List<FileSlice> fileSlices = this.hoodieTable.getSliceView() diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java index 119c0ed5aec..aa691be3573 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java @@ -19,13 +19,17 @@ package org.apache.hudi.common.util; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.compress.CompressionCodec; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieHBaseKVComparator; import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.storage.HoodieStorage; @@ -34,21 +38,50 @@ import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.TreeMap; + +import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME; +import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; /** * Utility functions for HFile files. */ -public class HFileUtils extends BaseFileUtils { - +public class HFileUtils extends FileFormatUtils { private static final Logger LOG = LoggerFactory.getLogger(HFileUtils.class); + private static final int DEFAULT_BLOCK_SIZE_FOR_LOG_FILE = 1024 * 1024; + + /** + * Gets the {@link Compression.Algorithm} Enum based on the {@link CompressionCodec} name. + * + * @param paramsMap parameter map containing the compression codec config. + * @return the {@link Compression.Algorithm} Enum. + */ + public static Compression.Algorithm getHFileCompressionAlgorithm(Map<String, String> paramsMap) { + String algoName = paramsMap.get(HFILE_COMPRESSION_ALGORITHM_NAME.key()); + if (StringUtils.isNullOrEmpty(algoName)) { + return Compression.Algorithm.GZ; + } + return Compression.Algorithm.valueOf(algoName.toUpperCase()); + } @Override public List<GenericRecord> readAvroRecords(StorageConfiguration<?> configuration, StoragePath filePath) { @@ -126,4 +159,89 @@ public class HFileUtils extends BaseFileUtils { public void writeMetaFile(HoodieStorage storage, StoragePath filePath, Properties props) throws IOException { throw new UnsupportedOperationException("HFileUtils does not support writeMetaFile"); } + + @Override + public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf, + List<HoodieRecord> records, + Schema writerSchema, + Schema readerSchema, + String keyFieldName, + Map<String, String> paramsMap) throws IOException { + Compression.Algorithm compressionAlgorithm = getHFileCompressionAlgorithm(paramsMap); + HFileContext context = new HFileContextBuilder() + .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE) + .withCompression(compressionAlgorithm) + .withCellComparator(new HoodieHBaseKVComparator()) + .build(); + + Configuration conf = storageConf.unwrapAs(Configuration.class); + CacheConfig cacheConfig = new CacheConfig(conf); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FSDataOutputStream ostream = new FSDataOutputStream(baos, null); + + // Use simple incrementing counter as a key + boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema, keyFieldName).isPresent(); + // This is set here to avoid re-computing this in the loop + int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 1 : -1; + + // Serialize records into bytes + Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>(); + + Iterator<HoodieRecord> itr = records.iterator(); + int id = 0; + while (itr.hasNext()) { + HoodieRecord<?> record = itr.next(); + String recordKey; + if (useIntegerKey) { + recordKey = String.format("%" + keyWidth + "s", id++); + } else { + recordKey = getRecordKey(record, readerSchema, keyFieldName).get(); + } + + final byte[] recordBytes = serializeRecord(record, writerSchema, keyFieldName); + // If key exists in the map, append to its list. If not, create a new list. + // Get the existing list of recordBytes for the recordKey, or an empty list if it doesn't exist + List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey, new ArrayList<>()); + recordBytesList.add(recordBytes); + // Put the updated list back into the map + sortedRecordsMap.put(recordKey, recordBytesList); + } + + HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig) + .withOutputStream(ostream).withFileContext(context).create(); + + // Write the records + sortedRecordsMap.forEach((recordKey, recordBytesList) -> { + for (byte[] recordBytes : recordBytesList) { + try { + KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, recordBytes); + writer.append(kv); + } catch (IOException e) { + throw new HoodieIOException("IOException serializing records", e); + } + } + }); + + writer.appendFileInfo( + getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY), getUTF8Bytes(readerSchema.toString())); + + writer.close(); + ostream.flush(); + ostream.close(); + + return baos.toByteArray(); + } + + private static Option<String> getRecordKey(HoodieRecord record, Schema readerSchema, String keyFieldName) { + return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName)); + } + + private static byte[] serializeRecord(HoodieRecord<?> record, Schema schema, String keyFieldName) throws IOException { + Option<Schema.Field> keyField = Option.ofNullable(schema.getField(keyFieldName)); + // Reset key value w/in the record to avoid duplicating the key w/in payload + if (keyField.isPresent()) { + record.truncateRecordKey(schema, new Properties(), keyField.get().name()); + } + return HoodieAvroUtils.recordToBytes(record, schema).get(); + } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index d45d8eb4733..8727ca5041d 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -69,7 +69,7 @@ import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopPath; /** * Utility functions for ORC files. */ -public class OrcUtils extends BaseFileUtils { +public class OrcUtils extends FileFormatUtils { /** * Provides a closable iterator for reading the given ORC file. @@ -303,4 +303,13 @@ public class OrcUtils extends BaseFileUtils { } } } + + @Override + public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf, + List<HoodieRecord> records, + Schema writerSchema, + Schema readerSchema, String keyFieldName, + Map<String, String> paramsMap) throws IOException { + throw new UnsupportedOperationException("Hudi log blocks do not support ORC format yet"); + } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 9d7ac5c6623..ad42567e647 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -21,6 +21,7 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; @@ -28,6 +29,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.MetadataNotFoundException; +import org.apache.hudi.io.storage.HoodieFileWriter; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; @@ -59,6 +62,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; @@ -74,10 +78,14 @@ import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_BLOCK_SIZE; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_PAGE_SIZE; + /** * Utility functions involving with parquet. */ -public class ParquetUtils extends BaseFileUtils { +public class ParquetUtils extends FileFormatUtils { private static final Logger LOG = LoggerFactory.getLogger(ParquetUtils.class); @@ -148,6 +156,14 @@ public class ParquetUtils extends BaseFileUtils { return rowKeys; } + /** + * @param codecName codec name in String. + * @return {@link CompressionCodecName} Enum. + */ + public static CompressionCodecName getCompressionCodecName(String codecName) { + return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) ? null : codecName); + } + /** * Fetch {@link HoodieKey}s from the given parquet file. * @@ -358,6 +374,41 @@ public class ParquetUtils extends BaseFileUtils { } } + @Override + public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf, + List<HoodieRecord> records, + Schema writerSchema, + Schema readerSchema, + String keyFieldName, + Map<String, String> paramsMap) throws IOException { + if (records.size() == 0) { + return new byte[0]; + } + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + HoodieConfig config = new HoodieConfig(); + paramsMap.entrySet().stream().forEach(entry -> config.setValue(entry.getKey(), entry.getValue())); + config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); + config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); + config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); + HoodieRecord.HoodieRecordType recordType = records.iterator().next().getRecordType(); + HoodieFileWriter parquetWriter = null; + try { + parquetWriter = HoodieFileWriterFactory.getFileWriter( + HoodieFileFormat.PARQUET, outputStream, storageConf, config, writerSchema, recordType); + for (HoodieRecord<?> record : records) { + String recordKey = record.getRecordKey(readerSchema, keyFieldName); + parquetWriter.write(recordKey, record, writerSchema); + } + outputStream.flush(); + } finally { + if (parquetWriter != null) { + parquetWriter.close(); + } + } + return outputStream.toByteArray(); + } + static class RecordKeysFilterFunction implements Function<String, Boolean> { private final Set<String> candidateKeys; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java index 116f36d7822..9f8b453535b 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java @@ -22,7 +22,7 @@ package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.AvroOrcUtils; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieAvroFileReader; @@ -52,12 +52,12 @@ public class HoodieAvroOrcReader extends HoodieAvroFileReader { private final StoragePath path; private final StorageConfiguration<?> conf; - private final BaseFileUtils orcUtils; + private final FileFormatUtils orcUtils; public HoodieAvroOrcReader(StorageConfiguration<?> configuration, StoragePath path) { this.conf = configuration; this.path = path; - this.orcUtils = BaseFileUtils.getInstance(HoodieFileFormat.ORC); + this.orcUtils = FileFormatUtils.getInstance(HoodieFileFormat.ORC); } @Override diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java index 25ad701e01d..76614dfea95 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.collection.ClosableIterator; @@ -58,7 +58,7 @@ public class HoodieAvroParquetReader extends HoodieAvroFileReader { private final StoragePath path; private final StorageConfiguration<?> conf; - private final BaseFileUtils parquetUtils; + private final FileFormatUtils parquetUtils; private final List<ParquetReaderIterator> readerIterators = new ArrayList<>(); public HoodieAvroParquetReader(StorageConfiguration<?> storageConf, StoragePath path) { @@ -66,7 +66,7 @@ public class HoodieAvroParquetReader extends HoodieAvroFileReader { // by the Reader (for proper config propagation to Parquet components) this.conf = tryOverrideDefaultConfigs(storageConf.newInstance()); this.path = path; - this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET); } @Override diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index db3c0e9354d..f7a98a4b2fe 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -75,8 +75,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -112,6 +110,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME; import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion; import static org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs; import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs; @@ -2690,9 +2690,9 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { case AVRO_DATA_BLOCK: return new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); case HFILE_DATA_BLOCK: - return new HoodieHFileDataBlock(records, header, Compression.Algorithm.GZ, pathForReader, HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue()); + return new HoodieHFileDataBlock(records, header, HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader, HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue()); case PARQUET_DATA_BLOCK: - return new HoodieParquetDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true); + return new HoodieParquetDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true); default: throw new RuntimeException("Unknown data block type " + dataBlockType); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java new file mode 100644 index 00000000000..c88dced4ab3 --- /dev/null +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hadoop.hbase.io.compress.Compression; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; +import java.util.Map; + +import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME; +import static org.apache.hudi.common.util.HFileUtils.getHFileCompressionAlgorithm; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link HFileUtils} + */ +public class TestHFileUtils { + @ParameterizedTest + @EnumSource(Compression.Algorithm.class) + public void testGetHFileCompressionAlgorithm(Compression.Algorithm algo) { + for (boolean upperCase : new boolean[] {true, false}) { + Map<String, String> paramsMap = Collections.singletonMap( + HFILE_COMPRESSION_ALGORITHM_NAME.key(), + upperCase ? algo.getName().toUpperCase() : algo.getName().toLowerCase()); + assertEquals(algo, getHFileCompressionAlgorithm(paramsMap)); + } + } + + @Test + public void testGetHFileCompressionAlgorithmWithEmptyString() { + assertEquals(Compression.Algorithm.GZ, getHFileCompressionAlgorithm( + Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), ""))); + } + + @Test + public void testGetDefaultHFileCompressionAlgorithm() { + assertEquals(Compression.Algorithm.GZ, getHFileCompressionAlgorithm(Collections.emptyMap())); + } +} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 7cdf3e6af29..f489102e6bb 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -49,12 +49,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.parquet.avro.AvroParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.File; import java.io.IOException; @@ -70,6 +68,9 @@ import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME; + public class InputFormatTestUtil { private static String TEST_WRITE_TOKEN = "1-0-1"; @@ -413,9 +414,9 @@ public class InputFormatTestUtil { List<HoodieRecord> hoodieRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()); if (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) { dataBlock = new HoodieHFileDataBlock( - hoodieRecords, header, Compression.Algorithm.GZ, writer.getLogFile().getPath(), HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue()); + hoodieRecords, header, HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath(), HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue()); } else if (logBlockType == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) { - dataBlock = new HoodieParquetDataBlock(hoodieRecords, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true); + dataBlock = new HoodieParquetDataBlock(hoodieRecords, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true); } else { dataBlock = new HoodieAvroDataBlock(hoodieRecords, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala index 791435f4bb7..c2a717e2764 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala @@ -23,14 +23,14 @@ import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} -import org.apache.hudi.common.util.{BaseFileUtils, Option} +import org.apache.hudi.common.util.{FileFormatUtils, Option} +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter import org.apache.hudi.io.storage.HoodieParquetConfig import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath} import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -48,7 +48,7 @@ object SparkHelpers { sourceFile: StoragePath, destinationFile: StoragePath, keysToSkip: Set[String]) { - val sourceRecords = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(conf, sourceFile).asScala + val sourceRecords = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(conf, sourceFile).asScala val schema: Schema = sourceRecords.head.getSchema val filter: BloomFilter = BloomFilterFactory.createBloomFilter( BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble, @@ -140,7 +140,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) { * @return */ def fileKeysAgainstBF(conf: StorageConfiguration[_], sqlContext: SQLContext, file: String): Boolean = { - val bf = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf, new StoragePath(file)) + val bf = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf, new StoragePath(file)) val foundCount = sqlContext.parquetFile(file) .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`") .collect().count(r => !bf.mightContain(r.getString(0))) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java index 11abebbb245..8ff46be7621 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java @@ -19,7 +19,7 @@ package org.apache.hudi; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -174,7 +174,7 @@ public class ColumnStatsIndexHelper { colMinMaxInfos = jsc.parallelize(baseFilesPaths, numParallelism) .mapPartitions(paths -> { - ParquetUtils utils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + ParquetUtils utils = (ParquetUtils) FileFormatUtils.getInstance(HoodieFileFormat.PARQUET); Iterable<String> iterable = () -> paths; return StreamSupport.stream(iterable.spliterator(), false) .flatMap(path -> diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index c2237e32cee..f856f35367c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -49,9 +49,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.FileFormatUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.VisibleForTesting; @@ -64,9 +64,9 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.utilities.util.BloomFilterData; import com.beust.jcommander.JCommander; @@ -1439,7 +1439,7 @@ public class HoodieMetadataTableValidator implements Serializable { .collect(Collectors.toList()); } else { return baseFileNameList.stream().flatMap(filename -> - BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata( + FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata( metaClient.getStorageConf(), new StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(), partitionPath), filename), allColumnNameList).stream())
