This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a8da2fdd1c0 [HUDI-7752] Abstract serializeRecords for log writing
(#11210)
a8da2fdd1c0 is described below
commit a8da2fdd1c08cbd86d99d48cc283f03f1fc2090d
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue May 14 11:57:15 2024 -0700
[HUDI-7752] Abstract serializeRecords for log writing (#11210)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 11 +-
.../hudi/io/HoodieKeyLocationFetchHandle.java | 8 +-
.../row/HoodieRowDataFileWriterFactory.java | 3 +-
.../client/TestHoodieJavaWriteClientInsert.java | 6 +-
.../commit/TestJavaCopyOnWriteActionExecutor.java | 6 +-
.../testutils/HoodieJavaClientTestHarness.java | 12 +-
.../hudi/io/storage/HoodieSparkParquetReader.java | 6 +-
.../row/HoodieInternalRowFileWriterFactory.java | 3 +-
.../hudi/client/TestUpdateSchemaEvolution.java | 4 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 6 +-
.../commit/TestCopyOnWriteActionExecutor.java | 6 +-
.../hudi/testutils/HoodieClientTestBase.java | 6 +-
.../hudi/common/model/HoodiePartitionMetadata.java | 6 +-
.../hudi/common/table/TableSchemaResolver.java | 6 +-
.../table/log/block/HoodieHFileDataBlock.java | 107 ++----------------
.../table/log/block/HoodieParquetDataBlock.java | 47 ++------
.../{BaseFileUtils.java => FileFormatUtils.java} | 29 ++++-
.../hudi/metadata/HoodieTableMetadataUtil.java | 8 +-
.../testutils/reader/HoodieFileSliceTestUtils.java | 7 +-
.../apache/hudi/common/util/TestBaseFileUtils.java | 6 +-
.../hudi/sink/bootstrap/BootstrapOperator.java | 4 +-
.../org/apache/hudi/common/util/HFileUtils.java | 122 ++++++++++++++++++++-
.../java/org/apache/hudi/common/util/OrcUtils.java | 11 +-
.../org/apache/hudi/common/util/ParquetUtils.java | 47 +++++++-
.../apache/hudi/io/hadoop/HoodieAvroOrcReader.java | 6 +-
.../hudi/io/hadoop/HoodieAvroParquetReader.java | 6 +-
.../common/functional/TestHoodieLogFormat.java | 8 +-
.../apache/hudi/common/util/TestHFileUtils.java | 59 ++++++++++
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 9 +-
.../org/apache/spark/sql/hudi/SparkHelpers.scala | 8 +-
.../org/apache/hudi/ColumnStatsIndexHelper.java | 4 +-
.../utilities/HoodieMetadataTableValidator.java | 6 +-
32 files changed, 364 insertions(+), 219 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index fe479549d7d..2248ce03f7a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -84,9 +84,7 @@ import
org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.storage.HoodieStorageLayout;
-import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.orc.CompressionKind;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2137,9 +2135,8 @@ public class HoodieWriteConfig extends HoodieConfig {
return getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION);
}
- public CompressionCodecName getParquetCompressionCodec() {
- String codecName =
getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
- return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName)
? null : codecName);
+ public String getParquetCompressionCodec() {
+ return getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
}
public boolean parquetDictionaryEnabled() {
@@ -2183,8 +2180,8 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieStorageConfig.HFILE_BLOCK_SIZE);
}
- public Compression.Algorithm getHFileCompressionAlgorithm() {
- return
Compression.Algorithm.valueOf(getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME));
+ public String getHFileCompressionAlgorithm() {
+ return getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME);
}
public long getOrcMaxFileSize() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index f05a0af3449..734a012d4bb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -50,11 +50,11 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O>
extends HoodieReadHandle<T
}
private List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
- BaseFileUtils baseFileUtils =
BaseFileUtils.getInstance(baseFile.getStoragePath());
+ FileFormatUtils fileFormatUtils =
FileFormatUtils.getInstance(baseFile.getStoragePath());
if (keyGeneratorOpt.isPresent()) {
- return
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(),
baseFile.getStoragePath(), keyGeneratorOpt);
+ return
fileFormatUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(),
baseFile.getStoragePath(), keyGeneratorOpt);
} else {
- return
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(),
baseFile.getStoragePath());
+ return
fileFormatUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(),
baseFile.getStoragePath());
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index be757a30954..8d2a87a5110 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
/**
@@ -73,7 +74,7 @@ public class HoodieRowDataFileWriterFactory {
return new HoodieRowDataParquetWriter(
convertToStoragePath(path), new HoodieParquetConfig<>(
writeSupport,
- writeConfig.getParquetCompressionCodec(),
+ getCompressionCodecName(writeConfig.getParquetCompressionCodec()),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
index 5a9d9dc2688..c5be86e7405 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -147,7 +147,7 @@ public class TestHoodieJavaWriteClientInsert extends
HoodieJavaClientTestHarness
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
metaClient = HoodieTableMetaClient.reload(metaClient);
- BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+ FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
// Get some records belong to the same partition (2021/09/11)
String insertRecordStr1 = "{\"_row_key\":\"1\","
@@ -221,7 +221,7 @@ public class TestHoodieJavaWriteClientInsert extends
HoodieJavaClientTestHarness
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
metaClient = HoodieTableMetaClient.reload(metaClient);
- BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+ FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
String partitionPath = "2021/09/11";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[]{partitionPath});
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
index e2ff6065192..f9262c7f939 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -34,7 +34,7 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -131,7 +131,7 @@ public class TestJavaCopyOnWriteActionExecutor extends
HoodieJavaClientTestHarne
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
- BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+ FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
String partitionPath = "2016/01/31";
@@ -480,7 +480,7 @@ public class TestJavaCopyOnWriteActionExecutor extends
HoodieJavaClientTestHarne
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
- BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+ FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
String partitionPath = "2022/04/09";
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 61429b3fef2..de3c9612f85 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -50,7 +50,7 @@ import
org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -908,7 +908,7 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
HashMap<String, String> paths =
getLatestFileIDsToFullPath(basePath, commitTimeline,
Arrays.asList(commitInstant));
return paths.values().stream().map(StoragePath::new).flatMap(path ->
-
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(),
path).stream())
+
FileFormatUtils.getInstance(path).readAvroRecords(context.getStorageConf(),
path).stream())
.filter(record -> {
if (filterByCommitTime) {
Object commitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -937,7 +937,7 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
try {
List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage,
paths);
return latestFiles.stream().mapToLong(baseFile ->
- BaseFileUtils.getInstance(baseFile.getStoragePath())
+ FileFormatUtils.getInstance(baseFile.getStoragePath())
.readAvroRecords(context.getStorageConf(),
baseFile.getStoragePath()).size())
.sum();
} catch (Exception e) {
@@ -975,7 +975,7 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
HashMap<String, String> fileIdToFullPath =
getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
String[] paths = fileIdToFullPath.values().toArray(new
String[fileIdToFullPath.size()]);
if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- return Arrays.stream(paths).map(StoragePath::new).flatMap(path ->
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(),
path).stream())
+ return Arrays.stream(paths).map(StoragePath::new).flatMap(path ->
FileFormatUtils.getInstance(path).readAvroRecords(context.getStorageConf(),
path).stream())
.filter(record -> {
if (lastCommitTimeOpt.isPresent()) {
Object commitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -1022,8 +1022,8 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
return builder;
}
- public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient
metaClient) {
- return
BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+ public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient
metaClient) {
+ return
FileFormatUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
}
protected HoodieTableMetaClient createMetaClient() {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 987bcf8ddd4..7203eb8a572 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSparkRecord;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -61,7 +61,7 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
private final StoragePath path;
private final StorageConfiguration<?> conf;
- private final BaseFileUtils parquetUtils;
+ private final FileFormatUtils parquetUtils;
private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
public HoodieSparkParquetReader(StorageConfiguration<?> conf, StoragePath
path) {
@@ -69,7 +69,7 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
this.conf = conf.newInstance();
// Avoid adding record in list element when convert parquet schema to avro
schema
conf.set(ADD_LIST_ELEMENT_RECORDS, "false");
- this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
+ this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index 8e7287a7024..7ebcd1f39ff 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -34,6 +34,7 @@ import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
/**
* Factory to assist in instantiating a new {@link
HoodieInternalRowFileWriter}.
@@ -76,7 +77,7 @@ public class HoodieInternalRowFileWriterFactory {
path,
new HoodieParquetConfig<>(
writeSupport,
- writeConfig.getParquetCompressionCodec(),
+ getCompressionCodecName(writeConfig.getParquetCompressionCodec()),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index e62f22b0ad0..f115b7c7202 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -28,7 +28,7 @@ import
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
@@ -134,7 +134,7 @@ public class TestUpdateSchemaEvolution extends
HoodieSparkClientTestHarness impl
Executable executable = () -> {
HoodieMergeHandle mergeHandle = new
HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
updateRecords.iterator(), updateRecords.get(0).getPartitionPath(),
insertResult.getFileId(), supplier, Option.empty());
- List<GenericRecord> oldRecords =
BaseFileUtils.getInstance(updateTable.getBaseFileFormat())
+ List<GenericRecord> oldRecords =
FileFormatUtils.getInstance(updateTable.getBaseFileFormat())
.readAvroRecords(updateTable.getStorageConf(),
new StoragePath(updateTable.getConfig().getBasePath() + "/" +
insertResult.getStat().getPath()),
mergeHandle.getWriterSchemaWithMetaFields());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index a40a3c4eaea..2a39f1eb5ae 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -75,9 +75,9 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -1199,7 +1199,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
- BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+ FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
// Inserts => will write file1
String commitTime1 = "001";
@@ -1312,7 +1312,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit,
false, mergeAllowDuplicateInserts); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
- BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+ FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
// Inserts => will write file1
String commitTime1 = "001";
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index d0891e70463..9b832bd6af6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -36,7 +36,7 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -205,14 +205,14 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestBase implemen
// Read out the bloom filter and make sure filter can answer record exist
or not
Path filePath = allFiles[0].getPath();
- BloomFilter filter = BaseFileUtils.getInstance(table.getBaseFileFormat())
+ BloomFilter filter = FileFormatUtils.getInstance(table.getBaseFileFormat())
.readBloomFilterFromMetadata(storageConf, new
StoragePath(filePath.toUri()));
for (HoodieRecord record : records) {
assertTrue(filter.mightContain(record.getRecordKey()));
}
// Read the base file, check the record content
- List<GenericRecord> fileRecords =
BaseFileUtils.getInstance(table.getBaseFileFormat())
+ List<GenericRecord> fileRecords =
FileFormatUtils.getInstance(table.getBaseFileFormat())
.readAvroRecords(storageConf, new StoragePath(filePath.toUri()));
GenericRecord newRecord;
int index = 0;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 2c4715174d4..f5792f3b92f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -32,7 +32,7 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
@@ -639,7 +639,7 @@ public class HoodieClientTestBase extends
HoodieSparkClientTestHarness {
return hoodieCleanStatsTwo.stream().filter(e ->
e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
}
- public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient
metaClient) {
- return
BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+ public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient
metaClient) {
+ return
FileFormatUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index e8edc8b9142..5d75414c6ff 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -18,7 +18,7 @@
package org.apache.hudi.common.model;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RetryHelper;
import org.apache.hudi.common.util.StringUtils;
@@ -137,7 +137,7 @@ public class HoodiePartitionMetadata {
HOODIE_PARTITION_METAFILE_PREFIX + "_" + UUID.randomUUID() +
getMetafileExtension());
try {
// write to temporary file
- BaseFileUtils.getInstance(format).writeMetaFile(storage, tmpPath, props);
+ FileFormatUtils.getInstance(format).writeMetaFile(storage, tmpPath,
props);
// move to actual path
storage.rename(tmpPath, filePath);
} finally {
@@ -185,7 +185,7 @@ public class HoodiePartitionMetadata {
private boolean readBaseFormatMetaFile() {
for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) {
try {
- BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath);
+ FileFormatUtils reader = FileFormatUtils.getInstance(metafilePath);
// Data file format
Map<String, String> metadata = reader.readFooter(
storage.getConf(), true, metafilePath, PARTITION_DEPTH_KEY,
COMMIT_TIME_KEY);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index b284fa4f881..aec5dc73ad6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -32,7 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -306,7 +306,7 @@ public class TableSchemaResolver {
.orElseThrow(() -> new IllegalArgumentException("Could not find any
data file written for compaction "
+ lastCompactionCommit + ", could not get schema for table " +
metaClient.getBasePath()));
StoragePath path = new StoragePath(filePath);
- return
BaseFileUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(),
path);
+ return
FileFormatUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(),
path);
}
private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
@@ -473,7 +473,7 @@ public class TableSchemaResolver {
// this is a log file
schema = readSchemaFromLogFile(filePath);
} else {
- schema =
BaseFileUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(),
filePath);
+ schema =
FileFormatUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(),
filePath);
}
}
return schema;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 4adbe3855fb..8df2ee4e6a6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -25,11 +25,10 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileReader;
@@ -43,29 +42,17 @@ import org.apache.hudi.storage.inline.InLineFSUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
import java.util.function.Supplier;
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -75,10 +62,8 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
*/
public class HoodieHFileDataBlock extends HoodieDataBlock {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieHFileDataBlock.class);
- private static final int DEFAULT_BLOCK_SIZE = 1024 * 1024;
- private static final String KV_COMPARATOR_CLASS_NAME =
"org.apache.hudi.io.storage.HoodieHBaseKVComparator";
- private final Option<Compression.Algorithm> compressionAlgorithm;
+ private final Option<String> compressionCodec;
// This path is used for constructing HFile reader context, which should not
be
// interpreted as the actual file path for the HFile data blocks
private final StoragePath pathForReader;
@@ -96,18 +81,18 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
boolean useNativeHFileReader) {
super(content, inputStreamSupplier, readBlockLazily,
Option.of(logBlockContentLocation), readerSchema,
header, footer, HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME,
enablePointLookups);
- this.compressionAlgorithm = Option.empty();
+ this.compressionCodec = Option.empty();
this.pathForReader = pathForReader;
this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
}
public HoodieHFileDataBlock(List<HoodieRecord> records,
Map<HeaderMetadataType, String> header,
- Compression.Algorithm compressionAlgorithm,
+ String compressionCodec,
StoragePath pathForReader,
boolean useNativeHFileReader) {
super(records, false, header, new HashMap<>(),
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
- this.compressionAlgorithm = Option.of(compressionAlgorithm);
+ this.compressionCodec = Option.of(compressionCodec);
this.pathForReader = pathForReader;
this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
}
@@ -119,70 +104,11 @@ public class HoodieHFileDataBlock extends HoodieDataBlock
{
@Override
protected byte[] serializeRecords(List<HoodieRecord> records,
StorageConfiguration<?> storageConf) throws IOException {
- HFileContext context = new HFileContextBuilder()
- .withBlockSize(DEFAULT_BLOCK_SIZE)
- .withCompression(compressionAlgorithm.get())
-
.withCellComparator(ReflectionUtils.loadClass(KV_COMPARATOR_CLASS_NAME))
- .build();
-
- Configuration conf = storageConf.unwrapAs(Configuration.class);
- CacheConfig cacheConfig = new CacheConfig(conf);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
-
- // Use simple incrementing counter as a key
- boolean useIntegerKey = !getRecordKey(records.get(0)).isPresent();
- // This is set here to avoid re-computing this in the loop
- int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) +
1 : -1;
-
- // Serialize records into bytes
- Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
- // Get writer schema
- Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-
- Iterator<HoodieRecord> itr = records.iterator();
- int id = 0;
- while (itr.hasNext()) {
- HoodieRecord<?> record = itr.next();
- String recordKey;
- if (useIntegerKey) {
- recordKey = String.format("%" + keyWidth + "s", id++);
- } else {
- recordKey = getRecordKey(record).get();
- }
-
- final byte[] recordBytes = serializeRecord(record, writerSchema);
- // If key exists in the map, append to its list. If not, create a new
list.
- // Get the existing list of recordBytes for the recordKey, or an empty
list if it doesn't exist
- List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey,
new ArrayList<>());
- recordBytesList.add(recordBytes);
- // Put the updated list back into the map
- sortedRecordsMap.put(recordKey, recordBytesList);
- }
-
- HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
- .withOutputStream(ostream).withFileContext(context).create();
-
- // Write the records
- sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
- for (byte[] recordBytes : recordBytesList) {
- try {
- KeyValue kv = new KeyValue(recordKey.getBytes(), null, null,
recordBytes);
- writer.append(kv);
- } catch (IOException e) {
- throw new HoodieIOException("IOException serializing records", e);
- }
- }
- });
-
- writer.appendFileInfo(
- getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY),
getUTF8Bytes(getSchema().toString()));
-
- writer.close();
- ostream.flush();
- ostream.close();
-
- return baos.toByteArray();
+ Schema writerSchema = new Schema.Parser().parse(
+
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
+ return
FileFormatUtils.getInstance(HoodieFileFormat.HFILE).serializeRecordsToLogBlock(
+ storageConf, records, writerSchema, getSchema(), getKeyFieldName(),
+ Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(),
compressionCodec.get()));
}
@Override
@@ -241,15 +167,6 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
}
- private byte[] serializeRecord(HoodieRecord<?> record, Schema schema) throws
IOException {
- Option<Schema.Field> keyField = getKeyField(schema);
- // Reset key value w/in the record to avoid duplicating the key w/in
payload
- if (keyField.isPresent()) {
- record.truncateRecordKey(schema, new Properties(),
keyField.get().name());
- }
- return HoodieAvroUtils.recordToBytes(record, schema).get();
- }
-
/**
* Print the record in json format
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 53285b8d4a2..616177f80b6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -18,38 +18,29 @@
package org.apache.hudi.common.table.log.block;
-import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.io.SeekableDataInputStream;
-import org.apache.hudi.io.storage.HoodieFileWriter;
-import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.inline.InLineFSUtils;
import org.apache.avro.Schema;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
-import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_BLOCK_SIZE;
import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION;
import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED;
-import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
-import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_PAGE_SIZE;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
@@ -58,7 +49,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_RE
*/
public class HoodieParquetDataBlock extends HoodieDataBlock {
- private final Option<CompressionCodecName> compressionCodecName;
+ private final Option<String> compressionCodecName;
private final Option<Double> expectedCompressionRatio;
private final Option<Boolean> useDictionaryEncoding;
@@ -81,7 +72,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
boolean shouldWriteRecordPositions,
Map<HeaderMetadataType, String> header,
String keyField,
- CompressionCodecName compressionCodecName,
+ String compressionCodecName,
double expectedCompressionRatio,
boolean useDictionaryEncoding
) {
@@ -99,29 +90,15 @@ public class HoodieParquetDataBlock extends HoodieDataBlock
{
@Override
protected byte[] serializeRecords(List<HoodieRecord> records,
StorageConfiguration<?> storageConf) throws IOException {
- if (records.size() == 0) {
- return new byte[0];
- }
-
- Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- HoodieConfig config = new HoodieConfig();
- config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(),
compressionCodecName.get().name());
- config.setValue(PARQUET_BLOCK_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
- config.setValue(PARQUET_PAGE_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
- config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 *
1024));
- config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(),
String.valueOf(expectedCompressionRatio.get()));
- config.setValue(PARQUET_DICTIONARY_ENABLED,
String.valueOf(useDictionaryEncoding.get()));
- HoodieRecordType recordType = records.iterator().next().getRecordType();
- try (HoodieFileWriter parquetWriter =
HoodieFileWriterFactory.getFileWriter(
- HoodieFileFormat.PARQUET, outputStream, storageConf, config,
writerSchema, recordType)) {
- for (HoodieRecord<?> record : records) {
- String recordKey = getRecordKey(record).orElse(null);
- parquetWriter.write(recordKey, record, writerSchema);
- }
- outputStream.flush();
- }
- return outputStream.toByteArray();
+ Map<String, String> paramsMap = new HashMap<>();
+ paramsMap.put(PARQUET_COMPRESSION_CODEC_NAME.key(),
compressionCodecName.get());
+ paramsMap.put(PARQUET_COMPRESSION_RATIO_FRACTION.key(),
String.valueOf(expectedCompressionRatio.get()));
+ paramsMap.put(PARQUET_DICTIONARY_ENABLED.key(),
String.valueOf(useDictionaryEncoding.get()));
+ Schema writerSchema = new Schema.Parser().parse(
+
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
+
+ return FileFormatUtils.getInstance(PARQUET).serializeRecordsToLogBlock(
+ storageConf, records, writerSchema, getSchema(), getKeyFieldName(),
paramsMap);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
similarity index 91%
rename from
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index e64d4596d2a..44655a57fb9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -47,14 +48,14 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * Utils for Hudi base file.
+ * Utils for file format used in Hudi.
*/
-public abstract class BaseFileUtils {
+public abstract class FileFormatUtils {
public static final String PARQUET_UTILS =
"org.apache.hudi.common.util.ParquetUtils";
public static final String ORC_UTILS =
"org.apache.hudi.common.util.OrcUtils";
public static final String HFILE_UTILS =
"org.apache.hudi.common.util.HFileUtils";
- public static BaseFileUtils getInstance(StoragePath path) {
+ public static FileFormatUtils getInstance(StoragePath path) {
if
(path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) {
return ReflectionUtils.loadClass(PARQUET_UTILS);
} else if
(path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) {
@@ -65,7 +66,7 @@ public abstract class BaseFileUtils {
throw new UnsupportedOperationException("The format for file " + path + "
is not supported yet.");
}
- public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) {
+ public static FileFormatUtils getInstance(HoodieFileFormat fileFormat) {
if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
return ReflectionUtils.loadClass(PARQUET_UTILS);
} else if (HoodieFileFormat.ORC.equals(fileFormat)) {
@@ -85,7 +86,7 @@ public abstract class BaseFileUtils {
ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(),
"fileColumnRanges should not be empty.");
// There are multiple files. Compute min(file_mins) and max(file_maxs)
return fileColumnRanges.stream()
- .reduce(BaseFileUtils::mergeRanges).orElseThrow(() -> new
HoodieException("MergingColumnRanges failed."));
+ .reduce(FileFormatUtils::mergeRanges).orElseThrow(() -> new
HoodieException("MergingColumnRanges failed."));
}
private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
mergeRanges(HoodieColumnRangeMetadata<T> one,
@@ -319,4 +320,22 @@ public abstract class BaseFileUtils {
public abstract void writeMetaFile(HoodieStorage storage,
StoragePath filePath,
Properties props) throws IOException;
+
+ /**
+ * Serializes Hudi records to the log block.
+ *
+ * @param storageConf storage configuration.
+ * @param records a list of {@link HoodieRecord}.
+ * @param writerSchema writer schema string from the log block header.
+ * @param readerSchema
+ * @param keyFieldName
+ * @param paramsMap additional params for serialization.
+ * @return byte array after serialization.
+ * @throws IOException upon serialization error.
+ */
+ public abstract byte[] serializeRecordsToLogBlock(StorageConfiguration<?>
storageConf,
+ List<HoodieRecord> records,
+ Schema writerSchema,
+ Schema readerSchema,
String keyFieldName,
+ Map<String, String>
paramsMap) throws IOException;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 1eff00a95da..6441f85c532 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -67,9 +67,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
@@ -1212,7 +1212,7 @@ public class HoodieTableMetadataUtil {
try {
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePathV2(), filePath);
- return BaseFileUtils.getInstance(HoodieFileFormat.PARQUET)
+ return FileFormatUtils.getInstance(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorageConf(),
fullFilePath, columnsToIndex);
}
@@ -1893,7 +1893,7 @@ public class HoodieTableMetadataUtil {
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
toList())); // Group by column name
// Step 3: Aggregate Column Ranges
Stream<HoodieColumnRangeMetadata<Comparable>>
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
- .map(entry ->
BaseFileUtils.getColumnRangeInPartition(entry.getValue()));
+ .map(entry ->
FileFormatUtils.getColumnRangeInPartition(entry.getValue()));
return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(toList()), false).iterator();
});
}
@@ -1957,7 +1957,7 @@ public class HoodieTableMetadataUtil {
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
toList())); // Group by column name
// Step 3: Aggregate Column Ranges
Stream<HoodieColumnRangeMetadata<Comparable>>
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
- .map(entry ->
BaseFileUtils.getColumnRangeInPartition(entry.getValue()));
+ .map(entry ->
FileFormatUtils.getColumnRangeInPartition(entry.getValue()));
return
HoodieMetadataPayload.createPartitionStatsRecords(partitionName,
partitionStatsRangeMetadata.collect(toList()), false).iterator();
});
} catch (Exception e) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 01052d4b00f..2a736709b42 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -56,7 +56,6 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -70,6 +69,8 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.DELETE_BLOCK;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
@@ -198,7 +199,7 @@ public class HoodieFileSliceTestUtils {
return new HoodieHFileDataBlock(
records,
header,
- Compression.Algorithm.GZ,
+ HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(),
pathForReader,
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
case PARQUET_DATA_BLOCK:
@@ -207,7 +208,7 @@ public class HoodieFileSliceTestUtils {
false,
header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
- CompressionCodecName.GZIP,
+ PARQUET_COMPRESSION_CODEC_NAME.defaultValue(),
0.1,
true);
default:
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
index 080eaa6b09e..3be4ff9b43c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
@@ -40,7 +40,7 @@ public class TestBaseFileUtils {
"path/to/file2", "columnName", 3, 8, 1, 15, 120, 250);
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- HoodieColumnRangeMetadata<Comparable> result =
BaseFileUtils.getColumnRangeInPartition(fileColumnRanges);
+ HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges);
// Step 3: Assertions
assertEquals(Integer.valueOf(1), new
Integer(result.getMinValue().toString()));
assertEquals(Integer.valueOf(8), new
Integer(result.getMaxValue().toString()));
@@ -60,7 +60,7 @@ public class TestBaseFileUtils {
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- HoodieColumnRangeMetadata<Comparable> result =
BaseFileUtils.getColumnRangeInPartition(fileColumnRanges);
+ HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges);
// Step 3: Assertions
assertEquals(Integer.valueOf(1), new
Integer(result.getMinValue().toString()));
assertEquals(Integer.valueOf(8), new
Integer(result.getMaxValue().toString()));
@@ -79,6 +79,6 @@ public class TestBaseFileUtils {
"path/to/file2", "columnName2", null, 8, 1, 15, 120, 250);
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- assertThrows(IllegalArgumentException.class, () ->
BaseFileUtils.getColumnRangeInPartition(fileColumnRanges));
+ assertThrows(IllegalArgumentException.class, () ->
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges));
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index b171560643b..ecd331079a1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -202,7 +202,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
Option<HoodieInstant> latestCommitTime =
commitsTimeline.filterCompletedAndCompactionInstants().lastInstant();
if (latestCommitTime.isPresent()) {
- BaseFileUtils fileUtils =
BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
+ FileFormatUtils fileUtils =
FileFormatUtils.getInstance(this.hoodieTable.getBaseFileFormat());
Schema schema = new
TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index d42a15b9e7c..b746a92250e 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
@@ -26,7 +27,10 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
@@ -35,21 +39,50 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.TreeMap;
+
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
/**
* Utility functions for HFile files.
*/
-public class HFileUtils extends BaseFileUtils {
-
+public class HFileUtils extends FileFormatUtils {
private static final Logger LOG = LoggerFactory.getLogger(HFileUtils.class);
+ private static final int DEFAULT_BLOCK_SIZE_FOR_LOG_FILE = 1024 * 1024;
+
+ /**
+ * Gets the {@link Compression.Algorithm} Enum based on the {@link
CompressionCodec} name.
+ *
+ * @param paramsMap parameter map containing the compression codec config.
+ * @return the {@link Compression.Algorithm} Enum.
+ */
+ public static Compression.Algorithm getHFileCompressionAlgorithm(Map<String,
String> paramsMap) {
+ String algoName = paramsMap.get(HFILE_COMPRESSION_ALGORITHM_NAME.key());
+ if (StringUtils.isNullOrEmpty(algoName)) {
+ return Compression.Algorithm.GZ;
+ }
+ return Compression.Algorithm.valueOf(algoName.toUpperCase());
+ }
@Override
public List<GenericRecord> readAvroRecords(StorageConfiguration<?>
configuration, StoragePath filePath) {
@@ -127,4 +160,89 @@ public class HFileUtils extends BaseFileUtils {
public void writeMetaFile(HoodieStorage storage, StoragePath filePath,
Properties props) throws IOException {
throw new UnsupportedOperationException("HFileUtils does not support
writeMetaFile");
}
+
+ @Override
+ public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+ List<HoodieRecord> records,
+ Schema writerSchema,
+ Schema readerSchema,
+ String keyFieldName,
+ Map<String, String> paramsMap)
throws IOException {
+ Compression.Algorithm compressionAlgorithm =
getHFileCompressionAlgorithm(paramsMap);
+ HFileContext context = new HFileContextBuilder()
+ .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+ .withCompression(compressionAlgorithm)
+ .withCellComparator(new HoodieHBaseKVComparator())
+ .build();
+
+ Configuration conf = storageConf.unwrapAs(Configuration.class);
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+ // Use simple incrementing counter as a key
+ boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema,
keyFieldName).isPresent();
+ // This is set here to avoid re-computing this in the loop
+ int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) +
1 : -1;
+
+ // Serialize records into bytes
+ Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
+
+ Iterator<HoodieRecord> itr = records.iterator();
+ int id = 0;
+ while (itr.hasNext()) {
+ HoodieRecord<?> record = itr.next();
+ String recordKey;
+ if (useIntegerKey) {
+ recordKey = String.format("%" + keyWidth + "s", id++);
+ } else {
+ recordKey = getRecordKey(record, readerSchema, keyFieldName).get();
+ }
+
+ final byte[] recordBytes = serializeRecord(record, writerSchema,
keyFieldName);
+ // If key exists in the map, append to its list. If not, create a new
list.
+ // Get the existing list of recordBytes for the recordKey, or an empty
list if it doesn't exist
+ List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey,
new ArrayList<>());
+ recordBytesList.add(recordBytes);
+ // Put the updated list back into the map
+ sortedRecordsMap.put(recordKey, recordBytesList);
+ }
+
+ HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+ .withOutputStream(ostream).withFileContext(context).create();
+
+ // Write the records
+ sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
+ for (byte[] recordBytes : recordBytesList) {
+ try {
+ KeyValue kv = new KeyValue(recordKey.getBytes(), null, null,
recordBytes);
+ writer.append(kv);
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException serializing records", e);
+ }
+ }
+ });
+
+ writer.appendFileInfo(
+ getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY),
getUTF8Bytes(readerSchema.toString()));
+
+ writer.close();
+ ostream.flush();
+ ostream.close();
+
+ return baos.toByteArray();
+ }
+
+ private static Option<String> getRecordKey(HoodieRecord record, Schema
readerSchema, String keyFieldName) {
+ return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
+ }
+
+ private static byte[] serializeRecord(HoodieRecord<?> record, Schema schema,
String keyFieldName) throws IOException {
+ Option<Schema.Field> keyField =
Option.ofNullable(schema.getField(keyFieldName));
+ // Reset key value w/in the record to avoid duplicating the key w/in
payload
+ if (keyField.isPresent()) {
+ record.truncateRecordKey(schema, new Properties(),
keyField.get().name());
+ }
+ return HoodieAvroUtils.recordToBytes(record, schema).get();
+ }
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index 6bbae77d4b9..31ff1233289 100644
--- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -69,7 +69,7 @@ import static
org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopPath;
/**
* Utility functions for ORC files.
*/
-public class OrcUtils extends BaseFileUtils {
+public class OrcUtils extends FileFormatUtils {
/**
* Provides a closable iterator for reading the given ORC file.
@@ -310,4 +310,13 @@ public class OrcUtils extends BaseFileUtils {
}
}
}
+
+ @Override
+ public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+ List<HoodieRecord> records,
+ Schema writerSchema,
+ Schema readerSchema, String
keyFieldName,
+ Map<String, String> paramsMap)
throws IOException {
+ throw new UnsupportedOperationException("Hudi log blocks do not support
ORC format yet");
+ }
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e31e610c7d7..76b3aaf058f 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
@@ -29,6 +30,8 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.MetadataNotFoundException;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
@@ -60,6 +63,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -75,10 +79,14 @@ import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_BLOCK_SIZE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_PAGE_SIZE;
+
/**
* Utility functions involving with parquet.
*/
-public class ParquetUtils extends BaseFileUtils {
+public class ParquetUtils extends FileFormatUtils {
private static final Logger LOG =
LoggerFactory.getLogger(ParquetUtils.class);
@@ -152,6 +160,14 @@ public class ParquetUtils extends BaseFileUtils {
return rowKeys;
}
+ /**
+ * @param codecName codec name in String.
+ * @return {@link CompressionCodecName} Enum.
+ */
+ public static CompressionCodecName getCompressionCodecName(String codecName)
{
+ return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName)
? null : codecName);
+ }
+
/**
* Fetch {@link HoodieKey}s with row positions from the given parquet file.
*
@@ -366,6 +382,35 @@ public class ParquetUtils extends BaseFileUtils {
}
}
+ @Override
+ public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+ List<HoodieRecord> records,
+ Schema writerSchema,
+ Schema readerSchema,
+ String keyFieldName,
+ Map<String, String> paramsMap)
throws IOException {
+ if (records.size() == 0) {
+ return new byte[0];
+ }
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ HoodieConfig config = new HoodieConfig();
+ paramsMap.entrySet().stream().forEach(entry ->
config.setValue(entry.getKey(), entry.getValue()));
+ config.setValue(PARQUET_BLOCK_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
+ config.setValue(PARQUET_PAGE_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
+ config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 *
1024));
+ HoodieRecord.HoodieRecordType recordType =
records.iterator().next().getRecordType();
+ try (HoodieFileWriter parquetWriter =
HoodieFileWriterFactory.getFileWriter(
+ HoodieFileFormat.PARQUET, outputStream, storageConf, config,
writerSchema, recordType)) {
+ for (HoodieRecord<?> record : records) {
+ String recordKey = record.getRecordKey(readerSchema, keyFieldName);
+ parquetWriter.write(recordKey, record, writerSchema);
+ }
+ outputStream.flush();
+ }
+ return outputStream.toByteArray();
+ }
+
static class RecordKeysFilterFunction implements Function<String, Boolean> {
private final Set<String> candidateKeys;
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
index 917b8a1a627..424be01b044 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
@@ -22,7 +22,7 @@ package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.AvroOrcUtils;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
@@ -53,12 +53,12 @@ public class HoodieAvroOrcReader extends
HoodieAvroFileReader {
private final StoragePath path;
private final StorageConfiguration<?> conf;
- private final BaseFileUtils orcUtils;
+ private final FileFormatUtils orcUtils;
public HoodieAvroOrcReader(StorageConfiguration<?> configuration,
StoragePath path) {
this.conf = configuration;
this.path = path;
- this.orcUtils = BaseFileUtils.getInstance(HoodieFileFormat.ORC);
+ this.orcUtils = FileFormatUtils.getInstance(HoodieFileFormat.ORC);
}
@Override
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index d75660a9a7e..6736379b4df 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -59,7 +59,7 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
private final StoragePath path;
private final StorageConfiguration<?> conf;
- private final BaseFileUtils parquetUtils;
+ private final FileFormatUtils parquetUtils;
private final List<ParquetReaderIterator> readerIterators = new
ArrayList<>();
public HoodieAvroParquetReader(StorageConfiguration<?> storageConf,
StoragePath path) {
@@ -67,7 +67,7 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
// by the Reader (for proper config propagation to Parquet components)
this.conf = tryOverrideDefaultConfigs(storageConf.newInstance());
this.path = path;
- this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
+ this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
}
@Override
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 792d28550e8..6db88b76005 100755
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -77,8 +77,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -114,6 +112,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -2775,9 +2775,9 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
case HFILE_DATA_BLOCK:
- return new HoodieHFileDataBlock(records, header,
Compression.Algorithm.GZ, pathForReader,
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+ return new HoodieHFileDataBlock(records, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader,
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
case PARQUET_DATA_BLOCK:
- return new HoodieParquetDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
+ return new HoodieParquetDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
default:
throw new RuntimeException("Unknown data block type " + dataBlockType);
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
new file mode 100644
index 00000000000..c88dced4ab3
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static
org.apache.hudi.common.util.HFileUtils.getHFileCompressionAlgorithm;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HFileUtils}
+ */
+public class TestHFileUtils {
+ @ParameterizedTest
+ @EnumSource(Compression.Algorithm.class)
+ public void testGetHFileCompressionAlgorithm(Compression.Algorithm algo) {
+ for (boolean upperCase : new boolean[] {true, false}) {
+ Map<String, String> paramsMap = Collections.singletonMap(
+ HFILE_COMPRESSION_ALGORITHM_NAME.key(),
+ upperCase ? algo.getName().toUpperCase() :
algo.getName().toLowerCase());
+ assertEquals(algo, getHFileCompressionAlgorithm(paramsMap));
+ }
+ }
+
+ @Test
+ public void testGetHFileCompressionAlgorithmWithEmptyString() {
+ assertEquals(Compression.Algorithm.GZ, getHFileCompressionAlgorithm(
+ Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), "")));
+ }
+
+ @Test
+ public void testGetDefaultHFileCompressionAlgorithm() {
+ assertEquals(Compression.Algorithm.GZ,
getHFileCompressionAlgorithm(Collections.emptyMap()));
+ }
+}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index dbee36338c7..1e08979cc55 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -50,12 +50,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.File;
import java.io.IOException;
@@ -71,6 +69,9 @@ import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
+
public class InputFormatTestUtil {
private static String TEST_WRITE_TOKEN = "1-0-1";
@@ -421,10 +422,10 @@ public class InputFormatTestUtil {
List<HoodieRecord> hoodieRecords =
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
if (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) {
dataBlock = new HoodieHFileDataBlock(
- hoodieRecords, header, Compression.Algorithm.GZ,
writer.getLogFile().getPath(),
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+ hoodieRecords, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath(),
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
} else if (logBlockType ==
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
dataBlock = new HoodieParquetDataBlock(hoodieRecords, false, header,
- HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP,
0.1, true);
+ HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
} else {
dataBlock = new HoodieAvroDataBlock(hoodieRecords, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index 791435f4bb7..c2a717e2764 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -23,14 +23,14 @@ import org.apache.hudi.common.bloom.{BloomFilter,
BloomFilterFactory}
import org.apache.hudi.common.config.HoodieStorageConfig
import
org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES,
BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.util.{BaseFileUtils, Option}
+import org.apache.hudi.common.util.{FileFormatUtils, Option}
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter
import org.apache.hudi.io.storage.HoodieParquetConfig
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration,
StoragePath}
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
-import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -48,7 +48,7 @@ object SparkHelpers {
sourceFile: StoragePath,
destinationFile: StoragePath,
keysToSkip: Set[String]) {
- val sourceRecords =
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(conf,
sourceFile).asScala
+ val sourceRecords =
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(conf,
sourceFile).asScala
val schema: Schema = sourceRecords.head.getSchema
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(
BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt,
BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
@@ -140,7 +140,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
* @return
*/
def fileKeysAgainstBF(conf: StorageConfiguration[_], sqlContext: SQLContext,
file: String): Boolean = {
- val bf =
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf,
new StoragePath(file))
+ val bf =
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf,
new StoragePath(file))
val foundCount = sqlContext.parquetFile(file)
.select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
.collect().count(r => !bf.mightContain(r.getString(0)))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 11abebbb245..8ff46be7621 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -19,7 +19,7 @@ package org.apache.hudi;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -174,7 +174,7 @@ public class ColumnStatsIndexHelper {
colMinMaxInfos =
jsc.parallelize(baseFilesPaths, numParallelism)
.mapPartitions(paths -> {
- ParquetUtils utils = (ParquetUtils)
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
+ ParquetUtils utils = (ParquetUtils)
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
Iterable<String> iterable = () -> paths;
return StreamSupport.stream(iterable.spliterator(), false)
.flatMap(path ->
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 55f0693d2c2..b9e72ed711d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -49,9 +49,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
@@ -64,9 +64,9 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.utilities.util.BloomFilterData;
import com.beust.jcommander.JCommander;
@@ -1438,7 +1438,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
.collect(Collectors.toList());
} else {
return baseFileNameList.stream().flatMap(filename ->
-
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata(
+
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata(
metaClient.getStorageConf(),
new
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(),
partitionPath), filename),
allColumnNameList).stream())