This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a8da2fdd1c0 [HUDI-7752] Abstract serializeRecords for log writing 
(#11210)
a8da2fdd1c0 is described below

commit a8da2fdd1c08cbd86d99d48cc283f03f1fc2090d
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue May 14 11:57:15 2024 -0700

    [HUDI-7752] Abstract serializeRecords for log writing (#11210)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  11 +-
 .../hudi/io/HoodieKeyLocationFetchHandle.java      |   8 +-
 .../row/HoodieRowDataFileWriterFactory.java        |   3 +-
 .../client/TestHoodieJavaWriteClientInsert.java    |   6 +-
 .../commit/TestJavaCopyOnWriteActionExecutor.java  |   6 +-
 .../testutils/HoodieJavaClientTestHarness.java     |  12 +-
 .../hudi/io/storage/HoodieSparkParquetReader.java  |   6 +-
 .../row/HoodieInternalRowFileWriterFactory.java    |   3 +-
 .../hudi/client/TestUpdateSchemaEvolution.java     |   4 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   6 +-
 .../commit/TestCopyOnWriteActionExecutor.java      |   6 +-
 .../hudi/testutils/HoodieClientTestBase.java       |   6 +-
 .../hudi/common/model/HoodiePartitionMetadata.java |   6 +-
 .../hudi/common/table/TableSchemaResolver.java     |   6 +-
 .../table/log/block/HoodieHFileDataBlock.java      | 107 ++----------------
 .../table/log/block/HoodieParquetDataBlock.java    |  47 ++------
 .../{BaseFileUtils.java => FileFormatUtils.java}   |  29 ++++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   8 +-
 .../testutils/reader/HoodieFileSliceTestUtils.java |   7 +-
 .../apache/hudi/common/util/TestBaseFileUtils.java |   6 +-
 .../hudi/sink/bootstrap/BootstrapOperator.java     |   4 +-
 .../org/apache/hudi/common/util/HFileUtils.java    | 122 ++++++++++++++++++++-
 .../java/org/apache/hudi/common/util/OrcUtils.java |  11 +-
 .../org/apache/hudi/common/util/ParquetUtils.java  |  47 +++++++-
 .../apache/hudi/io/hadoop/HoodieAvroOrcReader.java |   6 +-
 .../hudi/io/hadoop/HoodieAvroParquetReader.java    |   6 +-
 .../common/functional/TestHoodieLogFormat.java     |   8 +-
 .../apache/hudi/common/util/TestHFileUtils.java    |  59 ++++++++++
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |   9 +-
 .../org/apache/spark/sql/hudi/SparkHelpers.scala   |   8 +-
 .../org/apache/hudi/ColumnStatsIndexHelper.java    |   4 +-
 .../utilities/HoodieMetadataTableValidator.java    |   6 +-
 32 files changed, 364 insertions(+), 219 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index fe479549d7d..2248ce03f7a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -84,9 +84,7 @@ import 
org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 import org.apache.hudi.table.storage.HoodieStorageLayout;
 
-import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.orc.CompressionKind;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -2137,9 +2135,8 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION);
   }
 
-  public CompressionCodecName getParquetCompressionCodec() {
-    String codecName = 
getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
-    return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) 
? null : codecName);
+  public String getParquetCompressionCodec() {
+    return getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
   }
 
   public boolean parquetDictionaryEnabled() {
@@ -2183,8 +2180,8 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(HoodieStorageConfig.HFILE_BLOCK_SIZE);
   }
 
-  public Compression.Algorithm getHFileCompressionAlgorithm() {
-    return 
Compression.Algorithm.valueOf(getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME));
+  public String getHFileCompressionAlgorithm() {
+    return getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME);
   }
 
   public long getOrcMaxFileSize() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index f05a0af3449..734a012d4bb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -50,11 +50,11 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> 
extends HoodieReadHandle<T
   }
 
   private List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
-    BaseFileUtils baseFileUtils = 
BaseFileUtils.getInstance(baseFile.getStoragePath());
+    FileFormatUtils fileFormatUtils = 
FileFormatUtils.getInstance(baseFile.getStoragePath());
     if (keyGeneratorOpt.isPresent()) {
-      return 
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), 
baseFile.getStoragePath(), keyGeneratorOpt);
+      return 
fileFormatUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), 
baseFile.getStoragePath(), keyGeneratorOpt);
     } else {
-      return 
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), 
baseFile.getStoragePath());
+      return 
fileFormatUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), 
baseFile.getStoragePath());
     }
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index be757a30954..8d2a87a5110 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import java.io.IOException;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
 import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
@@ -73,7 +74,7 @@ public class HoodieRowDataFileWriterFactory {
     return new HoodieRowDataParquetWriter(
         convertToStoragePath(path), new HoodieParquetConfig<>(
         writeSupport,
-        writeConfig.getParquetCompressionCodec(),
+        getCompressionCodecName(writeConfig.getParquetCompressionCodec()),
         writeConfig.getParquetBlockSize(),
         writeConfig.getParquetPageSize(),
         writeConfig.getParquetMaxFileSize(),
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
index 5a9d9dc2688..c5be86e7405 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -147,7 +147,7 @@ public class TestHoodieJavaWriteClientInsert extends 
HoodieJavaClientTestHarness
 
     HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+    FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
 
     // Get some records belong to the same partition (2021/09/11)
     String insertRecordStr1 = "{\"_row_key\":\"1\","
@@ -221,7 +221,7 @@ public class TestHoodieJavaWriteClientInsert extends 
HoodieJavaClientTestHarness
 
     HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+    FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
 
     String partitionPath = "2021/09/11";
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[]{partitionPath});
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
index e2ff6065192..f9262c7f939 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -34,7 +34,7 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.testutils.Transformations;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -131,7 +131,7 @@ public class TestJavaCopyOnWriteActionExecutor extends 
HoodieJavaClientTestHarne
     HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
     writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+    FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
 
     String partitionPath = "2016/01/31";
 
@@ -480,7 +480,7 @@ public class TestJavaCopyOnWriteActionExecutor extends 
HoodieJavaClientTestHarne
     HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
     writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+    FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
 
     String partitionPath = "2022/04/09";
 
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 61429b3fef2..de3c9612f85 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -50,7 +50,7 @@ import 
org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -908,7 +908,7 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
       HashMap<String, String> paths =
           getLatestFileIDsToFullPath(basePath, commitTimeline, 
Arrays.asList(commitInstant));
       return paths.values().stream().map(StoragePath::new).flatMap(path ->
-              
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), 
path).stream())
+              
FileFormatUtils.getInstance(path).readAvroRecords(context.getStorageConf(), 
path).stream())
           .filter(record -> {
             if (filterByCommitTime) {
               Object commitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -937,7 +937,7 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
     try {
       List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage, 
paths);
       return latestFiles.stream().mapToLong(baseFile ->
-              BaseFileUtils.getInstance(baseFile.getStoragePath())
+              FileFormatUtils.getInstance(baseFile.getStoragePath())
                   .readAvroRecords(context.getStorageConf(), 
baseFile.getStoragePath()).size())
           .sum();
     } catch (Exception e) {
@@ -975,7 +975,7 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
       HashMap<String, String> fileIdToFullPath = 
getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
       String[] paths = fileIdToFullPath.values().toArray(new 
String[fileIdToFullPath.size()]);
       if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-        return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> 
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), 
path).stream())
+        return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> 
FileFormatUtils.getInstance(path).readAvroRecords(context.getStorageConf(), 
path).stream())
             .filter(record -> {
               if (lastCommitTimeOpt.isPresent()) {
                 Object commitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -1022,8 +1022,8 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
     return builder;
   }
 
-  public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient 
metaClient) {
-    return 
BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+  public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient 
metaClient) {
+    return 
FileFormatUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
   }
 
   protected HoodieTableMetaClient createMetaClient() {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 987bcf8ddd4..7203eb8a572 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieSparkRecord;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.ParquetReaderIterator;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -61,7 +61,7 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
 
   private final StoragePath path;
   private final StorageConfiguration<?> conf;
-  private final BaseFileUtils parquetUtils;
+  private final FileFormatUtils parquetUtils;
   private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
 
   public HoodieSparkParquetReader(StorageConfiguration<?> conf, StoragePath 
path) {
@@ -69,7 +69,7 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
     this.conf = conf.newInstance();
     // Avoid adding record in list element when convert parquet schema to avro 
schema
     conf.set(ADD_LIST_ELEMENT_RECORDS, "false");
-    this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
+    this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index 8e7287a7024..7ebcd1f39ff 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -34,6 +34,7 @@ import org.apache.spark.sql.types.StructType;
 import java.io.IOException;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
 
 /**
  * Factory to assist in instantiating a new {@link 
HoodieInternalRowFileWriter}.
@@ -76,7 +77,7 @@ public class HoodieInternalRowFileWriterFactory {
         path,
         new HoodieParquetConfig<>(
             writeSupport,
-            writeConfig.getParquetCompressionCodec(),
+            getCompressionCodecName(writeConfig.getParquetCompressionCodec()),
             writeConfig.getParquetBlockSize(),
             writeConfig.getParquetPageSize(),
             writeConfig.getParquetMaxFileSize(),
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index e62f22b0ad0..f115b7c7202 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -28,7 +28,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
@@ -134,7 +134,7 @@ public class TestUpdateSchemaEvolution extends 
HoodieSparkClientTestHarness impl
       Executable executable = () -> {
         HoodieMergeHandle mergeHandle = new 
HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
             updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), 
insertResult.getFileId(), supplier, Option.empty());
-        List<GenericRecord> oldRecords = 
BaseFileUtils.getInstance(updateTable.getBaseFileFormat())
+        List<GenericRecord> oldRecords = 
FileFormatUtils.getInstance(updateTable.getBaseFileFormat())
             .readAvroRecords(updateTable.getStorageConf(),
                 new StoragePath(updateTable.getConfig().getBasePath() + "/" + 
insertResult.getStat().getPath()),
                 mergeHandle.getWriterSchemaWithMetaFields());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index a40a3c4eaea..2a39f1eb5ae 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -75,9 +75,9 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.MarkerUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -1199,7 +1199,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     SparkRDDWriteClient client = getHoodieWriteClient(config);
-    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+    FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
 
     // Inserts => will write file1
     String commitTime1 = "001";
@@ -1312,7 +1312,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, 
false, mergeAllowDuplicateInserts); // hold upto 200 records max
     dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     SparkRDDWriteClient client = getHoodieWriteClient(config);
-    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
+    FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
 
     // Inserts => will write file1
     String commitTime1 = "001";
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index d0891e70463..9b832bd6af6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -36,7 +36,7 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.testutils.Transformations;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -205,14 +205,14 @@ public class TestCopyOnWriteActionExecutor extends 
HoodieClientTestBase implemen
 
     // Read out the bloom filter and make sure filter can answer record exist 
or not
     Path filePath = allFiles[0].getPath();
-    BloomFilter filter = BaseFileUtils.getInstance(table.getBaseFileFormat())
+    BloomFilter filter = FileFormatUtils.getInstance(table.getBaseFileFormat())
         .readBloomFilterFromMetadata(storageConf, new 
StoragePath(filePath.toUri()));
     for (HoodieRecord record : records) {
       assertTrue(filter.mightContain(record.getRecordKey()));
     }
 
     // Read the base file, check the record content
-    List<GenericRecord> fileRecords = 
BaseFileUtils.getInstance(table.getBaseFileFormat())
+    List<GenericRecord> fileRecords = 
FileFormatUtils.getInstance(table.getBaseFileFormat())
         .readAvroRecords(storageConf, new StoragePath(filePath.toUri()));
     GenericRecord newRecord;
     int index = 0;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 2c4715174d4..f5792f3b92f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -32,7 +32,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
@@ -639,7 +639,7 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
     return hoodieCleanStatsTwo.stream().filter(e -> 
e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
   }
 
-  public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient 
metaClient) {
-    return 
BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+  public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient 
metaClient) {
+    return 
FileFormatUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index e8edc8b9142..5d75414c6ff 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.RetryHelper;
 import org.apache.hudi.common.util.StringUtils;
@@ -137,7 +137,7 @@ public class HoodiePartitionMetadata {
         HOODIE_PARTITION_METAFILE_PREFIX + "_" + UUID.randomUUID() + 
getMetafileExtension());
     try {
       // write to temporary file
-      BaseFileUtils.getInstance(format).writeMetaFile(storage, tmpPath, props);
+      FileFormatUtils.getInstance(format).writeMetaFile(storage, tmpPath, 
props);
       // move to actual path
       storage.rename(tmpPath, filePath);
     } finally {
@@ -185,7 +185,7 @@ public class HoodiePartitionMetadata {
   private boolean readBaseFormatMetaFile() {
     for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) {
       try {
-        BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath);
+        FileFormatUtils reader = FileFormatUtils.getInstance(metafilePath);
         // Data file format
         Map<String, String> metadata = reader.readFooter(
             storage.getConf(), true, metafilePath, PARTITION_DEPTH_KEY, 
COMMIT_TIME_KEY);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index b284fa4f881..aec5dc73ad6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -32,7 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -306,7 +306,7 @@ public class TableSchemaResolver {
         .orElseThrow(() -> new IllegalArgumentException("Could not find any 
data file written for compaction "
             + lastCompactionCommit + ", could not get schema for table " + 
metaClient.getBasePath()));
     StoragePath path = new StoragePath(filePath);
-    return 
BaseFileUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(), 
path);
+    return 
FileFormatUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(), 
path);
   }
 
   private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
@@ -473,7 +473,7 @@ public class TableSchemaResolver {
         // this is a log file
         schema = readSchemaFromLogFile(filePath);
       } else {
-        schema = 
BaseFileUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(), 
filePath);
+        schema = 
FileFormatUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(),
 filePath);
       }
     }
     return schema;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 4adbe3855fb..8df2ee4e6a6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -25,11 +25,10 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileReader;
@@ -43,29 +42,17 @@ import org.apache.hudi.storage.inline.InLineFSUtils;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
 import java.util.function.Supplier;
 
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
@@ -75,10 +62,8 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  */
 public class HoodieHFileDataBlock extends HoodieDataBlock {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieHFileDataBlock.class);
-  private static final int DEFAULT_BLOCK_SIZE = 1024 * 1024;
-  private static final String KV_COMPARATOR_CLASS_NAME = 
"org.apache.hudi.io.storage.HoodieHBaseKVComparator";
 
-  private final Option<Compression.Algorithm> compressionAlgorithm;
+  private final Option<String> compressionCodec;
   // This path is used for constructing HFile reader context, which should not 
be
   // interpreted as the actual file path for the HFile data blocks
   private final StoragePath pathForReader;
@@ -96,18 +81,18 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
                               boolean useNativeHFileReader) {
     super(content, inputStreamSupplier, readBlockLazily, 
Option.of(logBlockContentLocation), readerSchema,
         header, footer, HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME, 
enablePointLookups);
-    this.compressionAlgorithm = Option.empty();
+    this.compressionCodec = Option.empty();
     this.pathForReader = pathForReader;
     this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
   }
 
   public HoodieHFileDataBlock(List<HoodieRecord> records,
                               Map<HeaderMetadataType, String> header,
-                              Compression.Algorithm compressionAlgorithm,
+                              String compressionCodec,
                               StoragePath pathForReader,
                               boolean useNativeHFileReader) {
     super(records, false, header, new HashMap<>(), 
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
-    this.compressionAlgorithm = Option.of(compressionAlgorithm);
+    this.compressionCodec = Option.of(compressionCodec);
     this.pathForReader = pathForReader;
     this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
   }
@@ -119,70 +104,11 @@ public class HoodieHFileDataBlock extends HoodieDataBlock 
{
 
   @Override
   protected byte[] serializeRecords(List<HoodieRecord> records, 
StorageConfiguration<?> storageConf) throws IOException {
-    HFileContext context = new HFileContextBuilder()
-        .withBlockSize(DEFAULT_BLOCK_SIZE)
-        .withCompression(compressionAlgorithm.get())
-        
.withCellComparator(ReflectionUtils.loadClass(KV_COMPARATOR_CLASS_NAME))
-        .build();
-
-    Configuration conf = storageConf.unwrapAs(Configuration.class);
-    CacheConfig cacheConfig = new CacheConfig(conf);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
-
-    // Use simple incrementing counter as a key
-    boolean useIntegerKey = !getRecordKey(records.get(0)).isPresent();
-    // This is set here to avoid re-computing this in the loop
-    int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 
1 : -1;
-
-    // Serialize records into bytes
-    Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
-    // Get writer schema
-    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-
-    Iterator<HoodieRecord> itr = records.iterator();
-    int id = 0;
-    while (itr.hasNext()) {
-      HoodieRecord<?> record = itr.next();
-      String recordKey;
-      if (useIntegerKey) {
-        recordKey = String.format("%" + keyWidth + "s", id++);
-      } else {
-        recordKey = getRecordKey(record).get();
-      }
-
-      final byte[] recordBytes = serializeRecord(record, writerSchema);
-      // If key exists in the map, append to its list. If not, create a new 
list.
-      // Get the existing list of recordBytes for the recordKey, or an empty 
list if it doesn't exist
-      List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey, 
new ArrayList<>());
-      recordBytesList.add(recordBytes);
-      // Put the updated list back into the map
-      sortedRecordsMap.put(recordKey, recordBytesList);
-    }
-
-    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
-        .withOutputStream(ostream).withFileContext(context).create();
-
-    // Write the records
-    sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
-      for (byte[] recordBytes : recordBytesList) {
-        try {
-          KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, 
recordBytes);
-          writer.append(kv);
-        } catch (IOException e) {
-          throw new HoodieIOException("IOException serializing records", e);
-        }
-      }
-    });
-
-    writer.appendFileInfo(
-        getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY), 
getUTF8Bytes(getSchema().toString()));
-
-    writer.close();
-    ostream.flush();
-    ostream.close();
-
-    return baos.toByteArray();
+    Schema writerSchema = new Schema.Parser().parse(
+        
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
+    return 
FileFormatUtils.getInstance(HoodieFileFormat.HFILE).serializeRecordsToLogBlock(
+        storageConf, records, writerSchema, getSchema(), getKeyFieldName(),
+        Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), 
compressionCodec.get()));
   }
 
   @Override
@@ -241,15 +167,6 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     }
   }
 
-  private byte[] serializeRecord(HoodieRecord<?> record, Schema schema) throws 
IOException {
-    Option<Schema.Field> keyField = getKeyField(schema);
-    // Reset key value w/in the record to avoid duplicating the key w/in 
payload
-    if (keyField.isPresent()) {
-      record.truncateRecordKey(schema, new Properties(), 
keyField.get().name());
-    }
-    return HoodieAvroUtils.recordToBytes(record, schema).get();
-  }
-
   /**
    * Print the record in json format
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 53285b8d4a2..616177f80b6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -18,38 +18,29 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.io.SeekableDataInputStream;
-import org.apache.hudi.io.storage.HoodieFileWriter;
-import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.inline.InLineFSUtils;
 
 import org.apache.avro.Schema;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 
-import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_BLOCK_SIZE;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED;
-import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
-import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_PAGE_SIZE;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
 
@@ -58,7 +49,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_RE
  */
 public class HoodieParquetDataBlock extends HoodieDataBlock {
 
-  private final Option<CompressionCodecName> compressionCodecName;
+  private final Option<String> compressionCodecName;
   private final Option<Double> expectedCompressionRatio;
   private final Option<Boolean> useDictionaryEncoding;
 
@@ -81,7 +72,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
                                 boolean shouldWriteRecordPositions,
                                 Map<HeaderMetadataType, String> header,
                                 String keyField,
-                                CompressionCodecName compressionCodecName,
+                                String compressionCodecName,
                                 double expectedCompressionRatio,
                                 boolean useDictionaryEncoding
   ) {
@@ -99,29 +90,15 @@ public class HoodieParquetDataBlock extends HoodieDataBlock 
{
 
   @Override
   protected byte[] serializeRecords(List<HoodieRecord> records, 
StorageConfiguration<?> storageConf) throws IOException {
-    if (records.size() == 0) {
-      return new byte[0];
-    }
-
-    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    HoodieConfig config = new HoodieConfig();
-    config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), 
compressionCodecName.get().name());
-    config.setValue(PARQUET_BLOCK_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
-    config.setValue(PARQUET_PAGE_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
-    config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 
1024));
-    config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), 
String.valueOf(expectedCompressionRatio.get()));
-    config.setValue(PARQUET_DICTIONARY_ENABLED, 
String.valueOf(useDictionaryEncoding.get()));
-    HoodieRecordType recordType = records.iterator().next().getRecordType();
-    try (HoodieFileWriter parquetWriter = 
HoodieFileWriterFactory.getFileWriter(
-        HoodieFileFormat.PARQUET, outputStream, storageConf, config, 
writerSchema, recordType)) {
-      for (HoodieRecord<?> record : records) {
-        String recordKey = getRecordKey(record).orElse(null);
-        parquetWriter.write(recordKey, record, writerSchema);
-      }
-      outputStream.flush();
-    }
-    return outputStream.toByteArray();
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(PARQUET_COMPRESSION_CODEC_NAME.key(), 
compressionCodecName.get());
+    paramsMap.put(PARQUET_COMPRESSION_RATIO_FRACTION.key(), 
String.valueOf(expectedCompressionRatio.get()));
+    paramsMap.put(PARQUET_DICTIONARY_ENABLED.key(), 
String.valueOf(useDictionaryEncoding.get()));
+    Schema writerSchema = new Schema.Parser().parse(
+        
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
+
+    return FileFormatUtils.getInstance(PARQUET).serializeRecordsToLogBlock(
+        storageConf, records, writerSchema, getSchema(), getKeyFieldName(), 
paramsMap);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
similarity index 91%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index e64d4596d2a..44655a57fb9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -47,14 +48,14 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * Utils for Hudi base file.
+ * Utils for file format used in Hudi.
  */
-public abstract class BaseFileUtils {
+public abstract class FileFormatUtils {
   public static final String PARQUET_UTILS = 
"org.apache.hudi.common.util.ParquetUtils";
   public static final String ORC_UTILS = 
"org.apache.hudi.common.util.OrcUtils";
   public static final String HFILE_UTILS = 
"org.apache.hudi.common.util.HFileUtils";
 
-  public static BaseFileUtils getInstance(StoragePath path) {
+  public static FileFormatUtils getInstance(StoragePath path) {
     if 
(path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) {
       return ReflectionUtils.loadClass(PARQUET_UTILS);
     } else if 
(path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) {
@@ -65,7 +66,7 @@ public abstract class BaseFileUtils {
     throw new UnsupportedOperationException("The format for file " + path + " 
is not supported yet.");
   }
 
-  public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) {
+  public static FileFormatUtils getInstance(HoodieFileFormat fileFormat) {
     if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
       return ReflectionUtils.loadClass(PARQUET_UTILS);
     } else if (HoodieFileFormat.ORC.equals(fileFormat)) {
@@ -85,7 +86,7 @@ public abstract class BaseFileUtils {
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
-        .reduce(BaseFileUtils::mergeRanges).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
+        .reduce(FileFormatUtils::mergeRanges).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
   }
 
   private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
mergeRanges(HoodieColumnRangeMetadata<T> one,
@@ -319,4 +320,22 @@ public abstract class BaseFileUtils {
   public abstract void writeMetaFile(HoodieStorage storage,
                                      StoragePath filePath,
                                      Properties props) throws IOException;
+
+  /**
+   * Serializes Hudi records to the log block.
+   *
+   * @param storageConf  storage configuration.
+   * @param records      a list of {@link HoodieRecord}.
+   * @param writerSchema writer schema string from the log block header.
+   * @param readerSchema
+   * @param keyFieldName
+   * @param paramsMap    additional params for serialization.
+   * @return byte array after serialization.
+   * @throws IOException upon serialization error.
+   */
+  public abstract byte[] serializeRecordsToLogBlock(StorageConfiguration<?> 
storageConf,
+                                                    List<HoodieRecord> records,
+                                                    Schema writerSchema,
+                                                    Schema readerSchema, 
String keyFieldName,
+                                                    Map<String, String> 
paramsMap) throws IOException;
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 1eff00a95da..6441f85c532 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -67,9 +67,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
@@ -1212,7 +1212,7 @@ public class HoodieTableMetadataUtil {
     try {
       if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
         StoragePath fullFilePath = new 
StoragePath(datasetMetaClient.getBasePathV2(), filePath);
-        return BaseFileUtils.getInstance(HoodieFileFormat.PARQUET)
+        return FileFormatUtils.getInstance(HoodieFileFormat.PARQUET)
             .readColumnStatsFromMetadata(datasetMetaClient.getStorageConf(), 
fullFilePath, columnsToIndex);
       }
 
@@ -1893,7 +1893,7 @@ public class HoodieTableMetadataUtil {
           
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName, 
toList())); // Group by column name
       // Step 3: Aggregate Column Ranges
       Stream<HoodieColumnRangeMetadata<Comparable>> 
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
-          .map(entry -> 
BaseFileUtils.getColumnRangeInPartition(entry.getValue()));
+          .map(entry -> 
FileFormatUtils.getColumnRangeInPartition(entry.getValue()));
       return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, 
partitionStatsRangeMetadata.collect(toList()), false).iterator();
     });
   }
@@ -1957,7 +1957,7 @@ public class HoodieTableMetadataUtil {
             
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName, 
toList())); // Group by column name
         // Step 3: Aggregate Column Ranges
         Stream<HoodieColumnRangeMetadata<Comparable>> 
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
-            .map(entry -> 
BaseFileUtils.getColumnRangeInPartition(entry.getValue()));
+            .map(entry -> 
FileFormatUtils.getColumnRangeInPartition(entry.getValue()));
         return 
HoodieMetadataPayload.createPartitionStatsRecords(partitionName, 
partitionStatsRangeMetadata.collect(toList()), false).iterator();
       });
     } catch (Exception e) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 01052d4b00f..2a736709b42 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -56,7 +56,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
@@ -70,6 +69,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.DELETE_BLOCK;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK;
 import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
@@ -198,7 +199,7 @@ public class HoodieFileSliceTestUtils {
         return new HoodieHFileDataBlock(
             records,
             header,
-            Compression.Algorithm.GZ,
+            HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(),
             pathForReader,
             HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
       case PARQUET_DATA_BLOCK:
@@ -207,7 +208,7 @@ public class HoodieFileSliceTestUtils {
             false,
             header,
             HoodieRecord.RECORD_KEY_METADATA_FIELD,
-            CompressionCodecName.GZIP,
+            PARQUET_COMPRESSION_CODEC_NAME.defaultValue(),
             0.1,
             true);
       default:
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
index 080eaa6b09e..3be4ff9b43c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
@@ -40,7 +40,7 @@ public class TestBaseFileUtils {
         "path/to/file2", "columnName", 3, 8, 1, 15, 120, 250);
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    HoodieColumnRangeMetadata<Comparable> result = 
BaseFileUtils.getColumnRangeInPartition(fileColumnRanges);
+    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges);
     // Step 3: Assertions
     assertEquals(Integer.valueOf(1), new 
Integer(result.getMinValue().toString()));
     assertEquals(Integer.valueOf(8), new 
Integer(result.getMaxValue().toString()));
@@ -60,7 +60,7 @@ public class TestBaseFileUtils {
 
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    HoodieColumnRangeMetadata<Comparable> result = 
BaseFileUtils.getColumnRangeInPartition(fileColumnRanges);
+    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges);
     // Step 3: Assertions
     assertEquals(Integer.valueOf(1), new 
Integer(result.getMinValue().toString()));
     assertEquals(Integer.valueOf(8), new 
Integer(result.getMaxValue().toString()));
@@ -79,6 +79,6 @@ public class TestBaseFileUtils {
         "path/to/file2", "columnName2", null, 8, 1, 15, 120, 250);
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    assertThrows(IllegalArgumentException.class, () -> 
BaseFileUtils.getColumnRangeInPartition(fileColumnRanges));
+    assertThrows(IllegalArgumentException.class, () -> 
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges));
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index b171560643b..ecd331079a1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -202,7 +202,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
     Option<HoodieInstant> latestCommitTime = 
commitsTimeline.filterCompletedAndCompactionInstants().lastInstant();
 
     if (latestCommitTime.isPresent()) {
-      BaseFileUtils fileUtils = 
BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
+      FileFormatUtils fileUtils = 
FileFormatUtils.getInstance(this.hoodieTable.getBaseFileFormat());
       Schema schema = new 
TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
 
       List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index d42a15b9e7c..b746a92250e 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
@@ -26,7 +27,10 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.HoodieStorage;
@@ -35,21 +39,50 @@ import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 /**
  * Utility functions for HFile files.
  */
-public class HFileUtils extends BaseFileUtils {
-
+public class HFileUtils extends FileFormatUtils {
   private static final Logger LOG = LoggerFactory.getLogger(HFileUtils.class);
+  private static final int DEFAULT_BLOCK_SIZE_FOR_LOG_FILE = 1024 * 1024;
+
+  /**
+   * Gets the {@link Compression.Algorithm} Enum based on the {@link 
CompressionCodec} name.
+   *
+   * @param paramsMap parameter map containing the compression codec config.
+   * @return the {@link Compression.Algorithm} Enum.
+   */
+  public static Compression.Algorithm getHFileCompressionAlgorithm(Map<String, 
String> paramsMap) {
+    String algoName = paramsMap.get(HFILE_COMPRESSION_ALGORITHM_NAME.key());
+    if (StringUtils.isNullOrEmpty(algoName)) {
+      return Compression.Algorithm.GZ;
+    }
+    return Compression.Algorithm.valueOf(algoName.toUpperCase());
+  }
 
   @Override
   public List<GenericRecord> readAvroRecords(StorageConfiguration<?> 
configuration, StoragePath filePath) {
@@ -127,4 +160,89 @@ public class HFileUtils extends BaseFileUtils {
   public void writeMetaFile(HoodieStorage storage, StoragePath filePath, 
Properties props) throws IOException {
     throw new UnsupportedOperationException("HFileUtils does not support 
writeMetaFile");
   }
+
+  @Override
+  public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+                                           List<HoodieRecord> records,
+                                           Schema writerSchema,
+                                           Schema readerSchema,
+                                           String keyFieldName,
+                                           Map<String, String> paramsMap) 
throws IOException {
+    Compression.Algorithm compressionAlgorithm = 
getHFileCompressionAlgorithm(paramsMap);
+    HFileContext context = new HFileContextBuilder()
+        .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+        .withCompression(compressionAlgorithm)
+        .withCellComparator(new HoodieHBaseKVComparator())
+        .build();
+
+    Configuration conf = storageConf.unwrapAs(Configuration.class);
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+    // Use simple incrementing counter as a key
+    boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema, 
keyFieldName).isPresent();
+    // This is set here to avoid re-computing this in the loop
+    int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 
1 : -1;
+
+    // Serialize records into bytes
+    Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
+
+    Iterator<HoodieRecord> itr = records.iterator();
+    int id = 0;
+    while (itr.hasNext()) {
+      HoodieRecord<?> record = itr.next();
+      String recordKey;
+      if (useIntegerKey) {
+        recordKey = String.format("%" + keyWidth + "s", id++);
+      } else {
+        recordKey = getRecordKey(record, readerSchema, keyFieldName).get();
+      }
+
+      final byte[] recordBytes = serializeRecord(record, writerSchema, 
keyFieldName);
+      // If key exists in the map, append to its list. If not, create a new 
list.
+      // Get the existing list of recordBytes for the recordKey, or an empty 
list if it doesn't exist
+      List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey, 
new ArrayList<>());
+      recordBytesList.add(recordBytes);
+      // Put the updated list back into the map
+      sortedRecordsMap.put(recordKey, recordBytesList);
+    }
+
+    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+        .withOutputStream(ostream).withFileContext(context).create();
+
+    // Write the records
+    sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
+      for (byte[] recordBytes : recordBytesList) {
+        try {
+          KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, 
recordBytes);
+          writer.append(kv);
+        } catch (IOException e) {
+          throw new HoodieIOException("IOException serializing records", e);
+        }
+      }
+    });
+
+    writer.appendFileInfo(
+        getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY), 
getUTF8Bytes(readerSchema.toString()));
+
+    writer.close();
+    ostream.flush();
+    ostream.close();
+
+    return baos.toByteArray();
+  }
+
+  private static Option<String> getRecordKey(HoodieRecord record, Schema 
readerSchema, String keyFieldName) {
+    return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
+  }
+
+  private static byte[] serializeRecord(HoodieRecord<?> record, Schema schema, 
String keyFieldName) throws IOException {
+    Option<Schema.Field> keyField = 
Option.ofNullable(schema.getField(keyFieldName));
+    // Reset key value w/in the record to avoid duplicating the key w/in 
payload
+    if (keyField.isPresent()) {
+      record.truncateRecordKey(schema, new Properties(), 
keyField.get().name());
+    }
+    return HoodieAvroUtils.recordToBytes(record, schema).get();
+  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index 6bbae77d4b9..31ff1233289 100644
--- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -69,7 +69,7 @@ import static 
org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopPath;
 /**
  * Utility functions for ORC files.
  */
-public class OrcUtils extends BaseFileUtils {
+public class OrcUtils extends FileFormatUtils {
 
   /**
    * Provides a closable iterator for reading the given ORC file.
@@ -310,4 +310,13 @@ public class OrcUtils extends BaseFileUtils {
       }
     }
   }
+
+  @Override
+  public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+                                           List<HoodieRecord> records,
+                                           Schema writerSchema,
+                                           Schema readerSchema, String 
keyFieldName,
+                                           Map<String, String> paramsMap) 
throws IOException {
+    throw new UnsupportedOperationException("Hudi log blocks do not support 
ORC format yet");
+  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e31e610c7d7..76b3aaf058f 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.util;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
@@ -29,6 +30,8 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.MetadataNotFoundException;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
@@ -60,6 +63,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -75,10 +79,14 @@ import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_BLOCK_SIZE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_PAGE_SIZE;
+
 /**
  * Utility functions involving with parquet.
  */
-public class ParquetUtils extends BaseFileUtils {
+public class ParquetUtils extends FileFormatUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetUtils.class);
 
@@ -152,6 +160,14 @@ public class ParquetUtils extends BaseFileUtils {
     return rowKeys;
   }
 
+  /**
+   * @param codecName codec name in String.
+   * @return {@link CompressionCodecName} Enum.
+   */
+  public static CompressionCodecName getCompressionCodecName(String codecName) 
{
+    return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) 
? null : codecName);
+  }
+
   /**
    * Fetch {@link HoodieKey}s with row positions from the given parquet file.
    *
@@ -366,6 +382,35 @@ public class ParquetUtils extends BaseFileUtils {
     }
   }
 
+  @Override
+  public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+                                           List<HoodieRecord> records,
+                                           Schema writerSchema,
+                                           Schema readerSchema,
+                                           String keyFieldName,
+                                           Map<String, String> paramsMap) 
throws IOException {
+    if (records.size() == 0) {
+      return new byte[0];
+    }
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    HoodieConfig config = new HoodieConfig();
+    paramsMap.entrySet().stream().forEach(entry -> 
config.setValue(entry.getKey(), entry.getValue()));
+    config.setValue(PARQUET_BLOCK_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
+    config.setValue(PARQUET_PAGE_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
+    config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 
1024));
+    HoodieRecord.HoodieRecordType recordType = 
records.iterator().next().getRecordType();
+    try (HoodieFileWriter parquetWriter = 
HoodieFileWriterFactory.getFileWriter(
+        HoodieFileFormat.PARQUET, outputStream, storageConf, config, 
writerSchema, recordType)) {
+      for (HoodieRecord<?> record : records) {
+        String recordKey = record.getRecordKey(readerSchema, keyFieldName);
+        parquetWriter.write(recordKey, record, writerSchema);
+      }
+      outputStream.flush();
+    }
+    return outputStream.toByteArray();
+  }
+
   static class RecordKeysFilterFunction implements Function<String, Boolean> {
 
     private final Set<String> candidateKeys;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
index 917b8a1a627..424be01b044 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
@@ -22,7 +22,7 @@ package org.apache.hudi.io.hadoop;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.AvroOrcUtils;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
@@ -53,12 +53,12 @@ public class HoodieAvroOrcReader extends 
HoodieAvroFileReader {
 
   private final StoragePath path;
   private final StorageConfiguration<?> conf;
-  private final BaseFileUtils orcUtils;
+  private final FileFormatUtils orcUtils;
 
   public HoodieAvroOrcReader(StorageConfiguration<?> configuration, 
StoragePath path) {
     this.conf = configuration;
     this.path = path;
-    this.orcUtils = BaseFileUtils.getInstance(HoodieFileFormat.ORC);
+    this.orcUtils = FileFormatUtils.getInstance(HoodieFileFormat.ORC);
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index d75660a9a7e..6736379b4df 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetReaderIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -59,7 +59,7 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
 
   private final StoragePath path;
   private final StorageConfiguration<?> conf;
-  private final BaseFileUtils parquetUtils;
+  private final FileFormatUtils parquetUtils;
   private final List<ParquetReaderIterator> readerIterators = new 
ArrayList<>();
 
   public HoodieAvroParquetReader(StorageConfiguration<?> storageConf, 
StoragePath path) {
@@ -67,7 +67,7 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     // by the Reader (for proper config propagation to Parquet components)
     this.conf = tryOverrideDefaultConfigs(storageConf.newInstance());
     this.path = path;
-    this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
+    this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 792d28550e8..6db88b76005 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -77,8 +77,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -114,6 +112,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -2775,9 +2775,9 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       case AVRO_DATA_BLOCK:
         return new HoodieAvroDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       case HFILE_DATA_BLOCK:
-        return new HoodieHFileDataBlock(records, header, 
Compression.Algorithm.GZ, pathForReader, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+        return new HoodieHFileDataBlock(records, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
       case PARQUET_DATA_BLOCK:
-        return new HoodieParquetDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
+        return new HoodieParquetDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
       default:
         throw new RuntimeException("Unknown data block type " + dataBlockType);
     }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
new file mode 100644
index 00000000000..c88dced4ab3
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static 
org.apache.hudi.common.util.HFileUtils.getHFileCompressionAlgorithm;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HFileUtils}
+ */
+public class TestHFileUtils {
+  @ParameterizedTest
+  @EnumSource(Compression.Algorithm.class)
+  public void testGetHFileCompressionAlgorithm(Compression.Algorithm algo) {
+    for (boolean upperCase : new boolean[] {true, false}) {
+      Map<String, String> paramsMap = Collections.singletonMap(
+          HFILE_COMPRESSION_ALGORITHM_NAME.key(),
+          upperCase ? algo.getName().toUpperCase() : 
algo.getName().toLowerCase());
+      assertEquals(algo, getHFileCompressionAlgorithm(paramsMap));
+    }
+  }
+
+  @Test
+  public void testGetHFileCompressionAlgorithmWithEmptyString() {
+    assertEquals(Compression.Algorithm.GZ, getHFileCompressionAlgorithm(
+        Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), "")));
+  }
+
+  @Test
+  public void testGetDefaultHFileCompressionAlgorithm() {
+    assertEquals(Compression.Algorithm.GZ, 
getHFileCompressionAlgorithm(Collections.emptyMap()));
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index dbee36338c7..1e08979cc55 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -50,12 +50,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 import java.io.File;
 import java.io.IOException;
@@ -71,6 +69,9 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
+
 public class InputFormatTestUtil {
 
   private static String TEST_WRITE_TOKEN = "1-0-1";
@@ -421,10 +422,10 @@ public class InputFormatTestUtil {
     List<HoodieRecord> hoodieRecords = 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
     if (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) {
       dataBlock = new HoodieHFileDataBlock(
-          hoodieRecords, header, Compression.Algorithm.GZ, 
writer.getLogFile().getPath(), 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+          hoodieRecords, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath(), 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
     } else if (logBlockType == 
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
       dataBlock = new HoodieParquetDataBlock(hoodieRecords, false, header,
-          HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 
0.1, true);
+          HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
     } else {
       dataBlock = new HoodieAvroDataBlock(hoodieRecords, false, header,
           HoodieRecord.RECORD_KEY_METADATA_FIELD);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index 791435f4bb7..c2a717e2764 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -23,14 +23,14 @@ import org.apache.hudi.common.bloom.{BloomFilter, 
BloomFilterFactory}
 import org.apache.hudi.common.config.HoodieStorageConfig
 import 
org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES,
 BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE}
 import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.util.{BaseFileUtils, Option}
+import org.apache.hudi.common.util.{FileFormatUtils, Option}
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter
 import org.apache.hudi.io.storage.HoodieParquetConfig
 import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, 
StoragePath}
 
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
-import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter
 import org.apache.parquet.avro.AvroSchemaConverter
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -48,7 +48,7 @@ object SparkHelpers {
                               sourceFile: StoragePath,
                               destinationFile: StoragePath,
                               keysToSkip: Set[String]) {
-    val sourceRecords = 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(conf, 
sourceFile).asScala
+    val sourceRecords = 
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(conf, 
sourceFile).asScala
     val schema: Schema = sourceRecords.head.getSchema
     val filter: BloomFilter = BloomFilterFactory.createBloomFilter(
       BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, 
BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
@@ -140,7 +140,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
    * @return
    */
   def fileKeysAgainstBF(conf: StorageConfiguration[_], sqlContext: SQLContext, 
file: String): Boolean = {
-    val bf = 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf,
 new StoragePath(file))
+    val bf = 
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf,
 new StoragePath(file))
     val foundCount = sqlContext.parquetFile(file)
       .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
       .collect().count(r => !bf.mightContain(r.getString(0)))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 11abebbb245..8ff46be7621 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -19,7 +19,7 @@ package org.apache.hudi;
 
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -174,7 +174,7 @@ public class ColumnStatsIndexHelper {
       colMinMaxInfos =
           jsc.parallelize(baseFilesPaths, numParallelism)
               .mapPartitions(paths -> {
-                ParquetUtils utils = (ParquetUtils) 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
+                ParquetUtils utils = (ParquetUtils) 
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
                 Iterable<String> iterable = () -> paths;
                 return StreamSupport.stream(iterable.spliterator(), false)
                     .flatMap(path ->
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 55f0693d2c2..b9e72ed711d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -49,9 +49,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.VisibleForTesting;
@@ -64,9 +64,9 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.utilities.util.BloomFilterData;
 
 import com.beust.jcommander.JCommander;
@@ -1438,7 +1438,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             .collect(Collectors.toList());
       } else {
         return baseFileNameList.stream().flatMap(filename ->
-                
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata(
+                
FileFormatUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata(
                     metaClient.getStorageConf(),
                     new 
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(), 
partitionPath), filename),
                     allColumnNameList).stream())

Reply via email to