This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 318486662b7 [HUDI-9012] Implement and utilize native HFile writer 
(#12866)
318486662b7 is described below

commit 318486662b750bf24f0211fa37346612505d28e7
Author: Lin Liu <[email protected]>
AuthorDate: Sun May 25 11:23:45 2025 -0700

    [HUDI-9012] Implement and utilize native HFile writer (#12866)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../bootstrap/index/hfile/HFileBootstrapIndex.java |   7 +-
 .../index/hfile/HFileBootstrapIndexWriter.java     |  59 ++---
 .../org/apache/hudi/common/util/HFileUtils.java    |  73 +++---
 .../apache/hudi/common/util/TestHFileUtils.java    |  11 +-
 .../io/hadoop/HoodieAvroFileWriterFactory.java     |  15 +-
 .../hudi/io/hadoop/HoodieAvroHFileWriter.java      |  82 +++----
 .../apache/hudi/io/hadoop/HoodieHFileConfig.java   |  66 ++----
 .../hadoop/TestHoodieHBaseHFileReaderWriter.java   |   3 +-
 .../io/hadoop/TestHoodieHFileReaderWriterBase.java |   6 +-
 .../apache/hudi/io/compress/CompressionCodec.java  |  79 ++++++-
 ...odieDecompressor.java => HoodieCompressor.java} |  23 +-
 ...orFactory.java => HoodieCompressorFactory.java} |  19 +-
 ...essor.java => HoodieAirliftGzipCompressor.java} |  27 ++-
 ...Decompressor.java => HoodieNoneCompressor.java} |  17 +-
 .../org/apache/hudi/io/hfile/ChecksumType.java     |  91 +++++++
 .../java/org/apache/hudi/io/hfile/HFileBlock.java  | 123 +++++++++-
 .../org/apache/hudi/io/hfile/HFileBlockType.java   |   4 +
 .../org/apache/hudi/io/hfile/HFileContext.java     |  39 ++-
 .../org/apache/hudi/io/hfile/HFileDataBlock.java   |  78 +++++-
 .../apache/hudi/io/hfile/HFileFileInfoBlock.java   |  45 ++++
 .../org/apache/hudi/io/hfile/HFileIndexBlock.java  |  55 +++++
 .../java/org/apache/hudi/io/hfile/HFileInfo.java   |   4 +-
 .../apache/hudi/io/hfile/HFileLeafIndexBlock.java  |   7 +
 .../org/apache/hudi/io/hfile/HFileMetaBlock.java   |  28 +++
 .../apache/hudi/io/hfile/HFileMetaIndexBlock.java  |  59 +++++
 .../org/apache/hudi/io/hfile/HFileReaderImpl.java  |   8 +
 .../apache/hudi/io/hfile/HFileRootIndexBlock.java  |  47 +++-
 .../org/apache/hudi/io/hfile/HFileTrailer.java     |   3 +-
 .../java/org/apache/hudi/io/hfile/HFileUtils.java  |  40 ----
 .../{HFileMetaBlock.java => HFileWriter.java}      |  32 ++-
 .../org/apache/hudi/io/hfile/HFileWriterImpl.java  | 233 ++++++++++++++++++
 .../{HFileMetaBlock.java => KeyValueEntry.java}    |  23 +-
 .../org/apache/hudi/io/hfile/UTF8StringKey.java    |   1 -
 ...Decompressor.java => TestHoodieCompressor.java} |  22 +-
 .../org/apache/hudi/io/hfile/TestHFileWriter.java  | 263 +++++++++++++++++++++
 35 files changed, 1367 insertions(+), 325 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndex.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndex.java
index 9901aa1de7b..fd342288cf0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndex.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.HoodieStorage;
@@ -144,9 +143,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
 
   @Override
   public BootstrapIndex.IndexWriter createWriter(String bootstrapBasePath) {
-    return (IndexWriter) 
ReflectionUtils.loadClass("org.apache.hudi.common.bootstrap.index.hfile.HBaseHFileBootstrapIndexWriter",
-        new Class<?>[] {String.class, HoodieTableMetaClient.class},
-        bootstrapBasePath, metaClient);
+    return new HFileBootstrapIndexWriter(bootstrapBasePath, metaClient);
   }
 
   @Override
@@ -155,7 +152,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
       StoragePath[] indexPaths = new StoragePath[] 
{partitionIndexPath(metaClient), fileIdIndexPath(metaClient)};
       for (StoragePath indexPath : indexPaths) {
         if (metaClient.getStorage().exists(indexPath)) {
-          LOG.info("Dropping bootstrap index. Deleting file : " + indexPath);
+          LOG.info("Dropping bootstrap index. Deleting file: {}", indexPath);
           metaClient.getStorage().deleteDirectory(indexPath);
         }
       }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HBaseHFileBootstrapIndexWriter.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndexWriter.java
similarity index 76%
rename from 
hudi-hadoop-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HBaseHFileBootstrapIndexWriter.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndexWriter.java
index 9ffacdc6112..bcd063ff5d0 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HBaseHFileBootstrapIndexWriter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndexWriter.java
@@ -30,21 +30,16 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.hfile.HFileContext;
+import org.apache.hudi.io.hfile.HFileWriter;
+import org.apache.hudi.io.hfile.HFileWriterImpl;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
@@ -52,21 +47,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.INDEX_INFO_KEY;
+import static 
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.INDEX_INFO_KEY_STRING;
 import static 
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.fileIdIndexPath;
 import static 
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.getFileGroupKey;
 import static 
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.getPartitionKey;
 import static 
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.partitionIndexPath;
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
-public class HBaseHFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter 
{
-  private static final Logger LOG = 
LoggerFactory.getLogger(HBaseHFileBootstrapIndexWriter.class);
+public class HFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HFileBootstrapIndexWriter.class);
 
   private final String bootstrapBasePath;
   private final StoragePath indexByPartitionPath;
   private final StoragePath indexByFileIdPath;
-  private HFile.Writer indexByPartitionWriter;
-  private HFile.Writer indexByFileIdWriter;
+  private HFileWriter indexByPartitionWriter;
+  private HFileWriter indexByFileIdWriter;
 
   private boolean closed = false;
   private int numPartitionKeysAdded = 0;
@@ -74,7 +68,7 @@ public class HBaseHFileBootstrapIndexWriter extends 
BootstrapIndex.IndexWriter {
 
   private final Map<String, List<BootstrapFileMapping>> sourceFileMappings = 
new HashMap<>();
 
-  public HBaseHFileBootstrapIndexWriter(String bootstrapBasePath, 
HoodieTableMetaClient metaClient) {
+  public HFileBootstrapIndexWriter(String bootstrapBasePath, 
HoodieTableMetaClient metaClient) {
     super(metaClient);
     try {
       metaClient.initializeBootstrapDirsIfNotExists();
@@ -114,9 +108,7 @@ public class HBaseHFileBootstrapIndexWriter extends 
BootstrapIndex.IndexWriter {
               
m.getBootstrapFileStatus())).collect(Collectors.toMap(Pair::getKey, 
Pair::getValue)));
       Option<byte[]> bytes = 
TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata, 
HoodieBootstrapPartitionMetadata.class);
       if (bytes.isPresent()) {
-        indexByPartitionWriter
-            .append(new KeyValue(getUTF8Bytes(getPartitionKey(partitionPath)), 
new byte[0], new byte[0],
-                HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, bytes.get()));
+        indexByPartitionWriter.append(getPartitionKey(partitionPath), 
bytes.get());
         numPartitionKeysAdded++;
       }
     } catch (IOException e) {
@@ -135,11 +127,10 @@ public class HBaseHFileBootstrapIndexWriter extends 
BootstrapIndex.IndexWriter {
       srcFilePartitionInfo.setPartitionPath(mapping.getPartitionPath());
       
srcFilePartitionInfo.setBootstrapPartitionPath(mapping.getBootstrapPartitionPath());
       
srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBootstrapFileStatus());
-      KeyValue kv = new 
KeyValue(getUTF8Bytes(getFileGroupKey(mapping.getFileGroupId())), new byte[0], 
new byte[0],
-          HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put,
-          TimelineMetadataUtils.serializeAvroMetadata(srcFilePartitionInfo,
-              HoodieBootstrapFilePartitionInfo.class).get());
-      indexByFileIdWriter.append(kv);
+      indexByFileIdWriter.append(
+          getFileGroupKey(mapping.getFileGroupId()),
+          TimelineMetadataUtils.serializeAvroMetadata(
+              srcFilePartitionInfo, 
HoodieBootstrapFilePartitionInfo.class).get());
       numFileIdKeysAdded++;
     } catch (IOException e) {
       throw new HoodieIOException(e.getMessage(), e);
@@ -166,9 +157,11 @@ public class HBaseHFileBootstrapIndexWriter extends 
BootstrapIndex.IndexWriter {
             .build();
         LOG.info("Appending FileId FileInfo :" + fileIdIndexInfo);
 
-        indexByPartitionWriter.appendFileInfo(INDEX_INFO_KEY,
+        indexByPartitionWriter.appendFileInfo(
+            INDEX_INFO_KEY_STRING,
             TimelineMetadataUtils.serializeAvroMetadata(partitionIndexInfo, 
HoodieBootstrapIndexInfo.class).get());
-        indexByFileIdWriter.appendFileInfo(INDEX_INFO_KEY,
+        indexByFileIdWriter.appendFileInfo(
+            INDEX_INFO_KEY_STRING,
             TimelineMetadataUtils.serializeAvroMetadata(fileIdIndexInfo, 
HoodieBootstrapIndexInfo.class).get());
 
         close();
@@ -196,15 +189,11 @@ public class HBaseHFileBootstrapIndexWriter extends 
BootstrapIndex.IndexWriter {
   @Override
   public void begin() {
     try {
-      HFileContext meta = new HFileContextBuilder().withCellComparator(new 
org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex.HoodieKVComparator()).build();
-      this.indexByPartitionWriter = 
HFile.getWriterFactory(metaClient.getStorageConf().unwrapAs(Configuration.class),
-              new 
CacheConfig(metaClient.getStorageConf().unwrapAs(Configuration.class)))
-          .withPath((FileSystem) metaClient.getStorage().getFileSystem(), new 
Path(indexByPartitionPath.toUri()))
-          .withFileContext(meta).create();
-      this.indexByFileIdWriter = 
HFile.getWriterFactory(metaClient.getStorageConf().unwrapAs(Configuration.class),
-              new 
CacheConfig(metaClient.getStorageConf().unwrapAs(Configuration.class)))
-          .withPath((FileSystem) metaClient.getStorage().getFileSystem(), new 
Path(indexByFileIdPath.toUri()))
-          .withFileContext(meta).create();
+      HFileContext context = HFileContext.builder().build();
+      OutputStream outputStreamForPartitionWriter = 
metaClient.getStorage().create(indexByPartitionPath);
+      this.indexByPartitionWriter = new HFileWriterImpl(context, 
outputStreamForPartitionWriter);
+      OutputStream outputStreamForFileIdWriter = 
metaClient.getStorage().create(indexByFileIdPath);
+      this.indexByFileIdWriter = new HFileWriterImpl(context, 
outputStreamForFileIdWriter);
     } catch (IOException ioe) {
       throw new HoodieIOException(ioe.getMessage(), ioe);
     }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
similarity index 82%
rename from 
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
rename to hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 001a4dd3474..6238fbcb920 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -29,9 +29,11 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.io.hfile.HFileContext;
+import org.apache.hudi.io.hfile.HFileWriter;
+import org.apache.hudi.io.hfile.HFileWriterImpl;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.HoodieStorage;
@@ -39,19 +41,13 @@ import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -70,17 +66,15 @@ public class HFileUtils extends FileFormatUtils {
   private static final int DEFAULT_BLOCK_SIZE_FOR_LOG_FILE = 1024 * 1024;
 
   /**
-   * Gets the {@link Compression.Algorithm} Enum based on the {@link 
CompressionCodec} name.
-   *
    * @param paramsMap parameter map containing the compression codec config.
-   * @return the {@link Compression.Algorithm} Enum.
+   * @return the {@link CompressionCodec} Enum.
    */
-  public static Compression.Algorithm getHFileCompressionAlgorithm(Map<String, 
String> paramsMap) {
-    String algoName = paramsMap.get(HFILE_COMPRESSION_ALGORITHM_NAME.key());
-    if (StringUtils.isNullOrEmpty(algoName)) {
-      return Compression.Algorithm.GZ;
+  public static CompressionCodec getHFileCompressionAlgorithm(Map<String, 
String> paramsMap) {
+    String codecName = paramsMap.get(HFILE_COMPRESSION_ALGORITHM_NAME.key());
+    if (StringUtils.isNullOrEmpty(codecName)) {
+      return CompressionCodec.GZIP;
     }
-    return Compression.Algorithm.valueOf(algoName.toUpperCase());
+    return CompressionCodec.findCodecByName(codecName);
   }
 
   @Override
@@ -167,17 +161,9 @@ public class HFileUtils extends FileFormatUtils {
                                            Schema readerSchema,
                                            String keyFieldName,
                                            Map<String, String> paramsMap) 
throws IOException {
-    Compression.Algorithm compressionAlgorithm = 
getHFileCompressionAlgorithm(paramsMap);
-    HFileContext context = new HFileContextBuilder()
-        .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
-        .withCompression(compressionAlgorithm)
-        .withCellComparator(new HoodieHBaseKVComparator())
-        .build();
-
-    Configuration conf = storage.getConf().unwrapAs(Configuration.class);
-    CacheConfig cacheConfig = new CacheConfig(conf);
+    CompressionCodec compressionCodec = 
getHFileCompressionAlgorithm(paramsMap);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+    OutputStream ostream = new DataOutputStream(baos);
 
     // Use simple incrementing counter as a key
     boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema, 
keyFieldName).isPresent();
@@ -209,26 +195,25 @@ public class HFileUtils extends FileFormatUtils {
       sortedRecordsMap.put(recordKey, recordBytes);
     }
 
-    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
-        .withOutputStream(ostream).withFileContext(context).create();
-
-    // Write the records
-    sortedRecordsMap.forEach((recordKey, recordBytes) -> {
-      try {
-        KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, 
recordBytes);
-        writer.append(kv);
-      } catch (IOException e) {
-        throw new HoodieIOException("IOException serializing records", e);
-      }
-    });
-
-    writer.appendFileInfo(
-        getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY), 
getUTF8Bytes(readerSchema.toString()));
+    HFileContext context = HFileContext.builder()
+        .blockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+        .compressionCodec(compressionCodec)
+        .build();
+    try (HFileWriter writer = new HFileWriterImpl(context, ostream)) {
+      sortedRecordsMap.forEach((recordKey,recordBytes) -> {
+        try {
+          writer.append(recordKey, recordBytes);
+        } catch (IOException e) {
+          throw new HoodieIOException("IOException serializing records", e);
+        }
+      });
+      writer.appendFileInfo(
+          HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
+          getUTF8Bytes(readerSchema.toString()));
+    }
 
-    writer.close();
     ostream.flush();
     ostream.close();
-
     return baos;
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
 b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
similarity index 84%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
rename to 
hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
index c88dced4ab3..fbf9b196f42 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
@@ -19,7 +19,8 @@
 
 package org.apache.hudi.common.util;
 
-import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hudi.io.compress.CompressionCodec;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -36,8 +37,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 public class TestHFileUtils {
   @ParameterizedTest
-  @EnumSource(Compression.Algorithm.class)
-  public void testGetHFileCompressionAlgorithm(Compression.Algorithm algo) {
+  @EnumSource(CompressionCodec.class)
+  public void testGetHFileCompressionAlgorithm(CompressionCodec algo) {
     for (boolean upperCase : new boolean[] {true, false}) {
       Map<String, String> paramsMap = Collections.singletonMap(
           HFILE_COMPRESSION_ALGORITHM_NAME.key(),
@@ -48,12 +49,12 @@ public class TestHFileUtils {
 
   @Test
   public void testGetHFileCompressionAlgorithmWithEmptyString() {
-    assertEquals(Compression.Algorithm.GZ, getHFileCompressionAlgorithm(
+    assertEquals(CompressionCodec.GZIP, getHFileCompressionAlgorithm(
         Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), "")));
   }
 
   @Test
   public void testGetDefaultHFileCompressionAlgorithm() {
-    assertEquals(Compression.Algorithm.GZ, 
getHFileCompressionAlgorithm(Collections.emptyMap()));
+    assertEquals(CompressionCodec.GZIP, 
getHFileCompressionAlgorithm(Collections.emptyMap()));
   }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
index aaf583afefb..eac3f07681e 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.io.compress.CompressionCodec;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -38,7 +39,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.orc.CompressionKind;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -48,11 +48,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Properties;
 
-import static org.apache.hudi.io.hadoop.HoodieHFileConfig.CACHE_DATA_IN_L1;
-import static 
org.apache.hudi.io.hadoop.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
-import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR;
-import static org.apache.hudi.io.hadoop.HoodieHFileConfig.PREFETCH_ON_OPEN;
-
 public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
 
   public HoodieAvroFileWriterFactory(HoodieStorage storage) {
@@ -98,13 +93,13 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
       String instantTime, StoragePath path, HoodieConfig config, Schema schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
     BloomFilter filter = createBloomFilter(config);
-    HoodieHFileConfig hfileConfig = new 
HoodieHFileConfig(storage.getConf().unwrapAs(Configuration.class),
-        Compression.Algorithm.valueOf(
+    HoodieHFileConfig hfileConfig = new HoodieHFileConfig(
+        storage.getConf(),
+        CompressionCodec.findCodecByName(
             
config.getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME)),
         config.getInt(HoodieStorageConfig.HFILE_BLOCK_SIZE),
         config.getLong(HoodieStorageConfig.HFILE_MAX_FILE_SIZE),
-        HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME,
-        PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, 
filter, HFILE_COMPARATOR);
+        HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME, filter);
     return new HoodieAvroHFileWriter(instantTime, path, hfileConfig, schema, 
taskContextSupplier, config.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS));
   }
 
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
index b81f57beb69..59ac7f3c352 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
@@ -27,9 +27,15 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieDuplicateKeyException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.hfile.HFileContext;
+import org.apache.hudi.io.hfile.HFileWriter;
+import org.apache.hudi.io.hfile.HFileWriterImpl;
 import org.apache.hudi.io.storage.HoodieAvroFileWriter;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -37,17 +43,9 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.io.Writable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
@@ -73,18 +71,14 @@ public class HoodieAvroHFileWriter
   private final TaskContextSupplier taskContextSupplier;
   private final boolean populateMetaFields;
   private final Option<Schema.Field> keyFieldSchema;
-  private HFile.Writer writer;
+  private HFileWriter writer;
   private String minRecordKey;
   private String maxRecordKey;
   private String prevRecordKey;
 
-  // This is private in CacheConfig so have been copied here.
-  private static final String DROP_BEHIND_CACHE_COMPACTION_KEY = 
"hbase.hfile.drop.behind.compaction";
-
   public HoodieAvroHFileWriter(String instantTime, StoragePath file, 
HoodieHFileConfig hfileConfig, Schema schema,
                                TaskContextSupplier taskContextSupplier, 
boolean populateMetaFields) throws IOException {
-
-    Configuration conf = HadoopFSUtils.registerFileSystem(file, 
hfileConfig.getHadoopConf());
+    Configuration conf = HadoopFSUtils.registerFileSystem(file, 
(Configuration) hfileConfig.getStorageConf().unwrap());
     this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
     FileSystem fs = this.file.getFileSystem(conf);
     this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem;
@@ -101,25 +95,18 @@ public class HoodieAvroHFileWriter
     this.taskContextSupplier = taskContextSupplier;
     this.populateMetaFields = populateMetaFields;
 
-    HFileContext context = new 
HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
-        .withCompression(hfileConfig.getCompressionAlgorithm())
-        .withCellComparator(hfileConfig.getHFileComparator())
+    HFileContext context = new HFileContext.Builder()
+        .blockSize(hfileConfig.getBlockSize())
+        .compressionCodec(hfileConfig.getCompressionCodec())
         .build();
+    StorageConfiguration<Configuration> storageConf = new 
HadoopStorageConfiguration(conf);
+    StoragePath filePath = new StoragePath(this.file.toUri());
+    OutputStream outputStream =  HoodieStorageUtils.getStorage(filePath, 
storageConf).create(filePath);
+    this.writer = new HFileWriterImpl(context, outputStream);
 
-    conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY,
-        String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
-    conf.set(HColumnDescriptor.CACHE_DATA_IN_L1, 
String.valueOf(hfileConfig.shouldCacheDataInL1()));
-    conf.set(DROP_BEHIND_CACHE_COMPACTION_KEY,
-        String.valueOf(hfileConfig.shouldDropBehindCacheCompaction()));
-    CacheConfig cacheConfig = new CacheConfig(conf);
-    this.writer = HFile.getWriterFactory(conf, cacheConfig)
-        .withPath(fs, this.file)
-        .withFileContext(context)
-        .create();
-
-    
writer.appendFileInfo(getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY),
-        getUTF8Bytes(schema.toString()));
     this.prevRecordKey = "";
+    writer.appendFileInfo(
+        HoodieAvroHFileReaderImplBase.SCHEMA_KEY, 
getUTF8Bytes(schema.toString()));
   }
 
   @Override
@@ -149,7 +136,8 @@ public class HoodieAvroHFileWriter
     if (keyFieldSchema.isPresent()) {
       GenericRecord keyExcludedRecord = (GenericRecord) record;
       int keyFieldPos = this.keyFieldSchema.get().pos();
-      boolean isKeyAvailable = (record.get(keyFieldPos) != null && 
!(record.get(keyFieldPos).toString().isEmpty()));
+      boolean isKeyAvailable = (record.get(keyFieldPos) != null
+          && !(record.get(keyFieldPos).toString().isEmpty()));
       if (isKeyAvailable) {
         Object originalKey = keyExcludedRecord.get(keyFieldPos);
         keyExcludedRecord.put(keyFieldPos, EMPTY_STRING);
@@ -161,9 +149,7 @@ public class HoodieAvroHFileWriter
     if (!isRecordSerialized) {
       value = HoodieAvroUtils.avroToBytes((GenericRecord) record);
     }
-
-    KeyValue kv = new KeyValue(getUTF8Bytes(recordKey), null, null, value);
-    writer.append(kv);
+    writer.append(recordKey, value);
 
     if (hfileConfig.useBloomFilter()) {
       hfileConfig.getBloomFilter().add(recordKey);
@@ -185,25 +171,17 @@ public class HoodieAvroHFileWriter
       if (maxRecordKey == null) {
         maxRecordKey = "";
       }
-      
writer.appendFileInfo(getUTF8Bytes(HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD),
-          getUTF8Bytes(minRecordKey));
-      
writer.appendFileInfo(getUTF8Bytes(HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD),
-          getUTF8Bytes(maxRecordKey));
-      
writer.appendFileInfo(getUTF8Bytes(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE),
+      writer.appendFileInfo(
+          HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD, 
getUTF8Bytes(minRecordKey));
+      writer.appendFileInfo(
+          HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD, 
getUTF8Bytes(maxRecordKey));
+      writer.appendFileInfo(
+          HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE,
           getUTF8Bytes(bloomFilter.getBloomFilterTypeCode().toString()));
-      
writer.appendMetaBlock(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK,
-          new Writable() {
-            @Override
-            public void write(DataOutput out) throws IOException {
-              out.write(getUTF8Bytes(bloomFilter.serializeToString()));
-            }
-
-            @Override
-            public void readFields(DataInput in) throws IOException {
-            }
-          });
+      writer.appendMetaInfo(
+          HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK,
+          getUTF8Bytes(bloomFilter.serializeToString()));
     }
-
     writer.close();
     writer = null;
   }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
index 83b659a6be0..37a225f734e 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
@@ -20,55 +20,37 @@
 package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.storage.StorageConfiguration;
 
 public class HoodieHFileConfig {
-
-  public static final CellComparator HFILE_COMPARATOR = new 
HoodieHBaseKVComparator();
-  public static final boolean PREFETCH_ON_OPEN = 
CacheConfig.DEFAULT_PREFETCH_ON_OPEN;
-  public static final boolean CACHE_DATA_IN_L1 = 
HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1;
-  // This is private in CacheConfig so have been copied here.
-  public static final boolean DROP_BEHIND_CACHE_COMPACTION = true;
-
-  private final Compression.Algorithm compressionAlgorithm;
+  private final CompressionCodec compressionCodec;
   private final int blockSize;
   private final long maxFileSize;
-  private final boolean prefetchBlocksOnOpen;
-  private final boolean cacheDataInL1;
-  private final boolean dropBehindCacheCompaction;
-  private final Configuration hadoopConf;
+  private final StorageConfiguration storageConf;
   private final BloomFilter bloomFilter;
-  private final CellComparator hfileComparator;
   private final String keyFieldName;
 
-  public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm 
compressionAlgorithm, int blockSize,
-                           long maxFileSize, String keyFieldName,
-                           boolean prefetchBlocksOnOpen, boolean 
cacheDataInL1, boolean dropBehindCacheCompaction,
-                           BloomFilter bloomFilter, CellComparator 
hfileComparator) {
-    this.hadoopConf = hadoopConf;
-    this.compressionAlgorithm = compressionAlgorithm;
+  public HoodieHFileConfig(StorageConfiguration storageConf,
+                           CompressionCodec compressionCodec,
+                           int blockSize,
+                           long maxFileSize,
+                           String keyFieldName,
+                           BloomFilter bloomFilter) {
+    this.storageConf = storageConf;
+    this.compressionCodec = compressionCodec;
     this.blockSize = blockSize;
     this.maxFileSize = maxFileSize;
-    this.prefetchBlocksOnOpen = prefetchBlocksOnOpen;
-    this.cacheDataInL1 = cacheDataInL1;
-    this.dropBehindCacheCompaction = dropBehindCacheCompaction;
     this.bloomFilter = bloomFilter;
-    this.hfileComparator = hfileComparator;
     this.keyFieldName = keyFieldName;
   }
 
-  public Configuration getHadoopConf() {
-    return hadoopConf;
+  public StorageConfiguration getStorageConf() {
+    return storageConf;
   }
 
-  public Compression.Algorithm getCompressionAlgorithm() {
-    return compressionAlgorithm;
+  public CompressionCodec getCompressionCodec() {
+    return compressionCodec;
   }
 
   public int getBlockSize() {
@@ -79,18 +61,6 @@ public class HoodieHFileConfig {
     return maxFileSize;
   }
 
-  public boolean shouldPrefetchBlocksOnOpen() {
-    return prefetchBlocksOnOpen;
-  }
-
-  public boolean shouldCacheDataInL1() {
-    return cacheDataInL1;
-  }
-
-  public boolean shouldDropBehindCacheCompaction() {
-    return dropBehindCacheCompaction;
-  }
-
   public boolean useBloomFilter() {
     return bloomFilter != null;
   }
@@ -99,10 +69,6 @@ public class HoodieHFileConfig {
     return bloomFilter;
   }
 
-  public CellComparator getHFileComparator() {
-    return hfileComparator;
-  }
-
   public String getKeyFieldName() {
     return keyFieldName;
   }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
index e0d307e2f03..ee1138800a6 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
@@ -56,7 +56,8 @@ import static 
org.apache.hudi.io.hfile.TestHFileReader.VALUE_CREATOR;
 import static 
org.apache.hudi.io.storage.TestHoodieReaderWriterUtils.writeHFileForTesting;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class TestHoodieHBaseHFileReaderWriter extends 
TestHoodieHFileReaderWriterBase {
+@Disabled("HUDI-9084")
+class TestHoodieHBaseHFileReaderWriter extends TestHoodieHFileReaderWriterBase 
{
   @Override
   protected HoodieAvroFileReader createReader(
       HoodieStorage storage) throws Exception {
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
index 9fce54b82fe..8c319298a24 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
@@ -33,6 +33,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
@@ -73,7 +74,6 @@ import java.util.stream.StreamSupport;
 import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
 import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
 import static org.apache.hudi.common.util.CollectionUtils.toStream;
-import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR;
 import static 
org.apache.hudi.io.hfile.TestHFileReader.BOOTSTRAP_INDEX_HFILE_SUFFIX;
 import static 
org.apache.hudi.io.hfile.TestHFileReader.COMPLEX_SCHEMA_HFILE_SUFFIX;
 import static 
org.apache.hudi.io.hfile.TestHFileReader.SIMPLE_SCHEMA_HFILE_SUFFIX;
@@ -485,7 +485,7 @@ public abstract class TestHoodieHFileReaderWriterBase 
extends TestHoodieReaderWr
     FileSystem fs = HadoopFSUtils.getFs(getFilePath().toString(), new 
Configuration());
     byte[] content = readHFileFromResources(simpleHFile);
     verifyHFileReader(
-        content, hfilePrefix, true, HFILE_COMPARATOR.getClass(), 
NUM_RECORDS_FIXTURE);
+        content, hfilePrefix, true, HoodieHBaseKVComparator.class, 
NUM_RECORDS_FIXTURE);
 
     HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
     try (HoodieAvroHFileReaderImplBase hfileReader = 
createHFileReader(storage, content)) {
@@ -497,7 +497,7 @@ public abstract class TestHoodieHFileReaderWriterBase 
extends TestHoodieReaderWr
 
     content = readHFileFromResources(complexHFile);
     verifyHFileReader(
-        content, hfilePrefix, true, HFILE_COMPARATOR.getClass(), 
NUM_RECORDS_FIXTURE);
+        content, hfilePrefix, true, HoodieHBaseKVComparator.class, 
NUM_RECORDS_FIXTURE);
     try (HoodieAvroHFileReaderImplBase hfileReader = 
createHFileReader(storage, content)) {
       Schema avroSchema =
           getSchemaFromResource(TestHoodieReaderWriterBase.class, 
"/exampleSchemaWithUDT.avsc");
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java 
b/hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java
index d9c933cdc08..6ddf004b94f 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java
@@ -19,26 +19,89 @@
 
 package org.apache.hudi.io.compress;
 
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 /**
  * Available compression codecs.
  * There should not be any assumption on the ordering or ordinal of the 
defined enums.
  */
 public enum CompressionCodec {
-  NONE("none"),
-  BZIP2("bz2"),
-  GZIP("gz"),
-  LZ4("lz4"),
-  LZO("lzo"),
-  SNAPPY("snappy"),
-  ZSTD("zstd");
+  NONE("none", 2),
+  BZIP2("bz2", 5),
+  GZIP("gz", 1),
+  LZ4("lz4", 4),
+  LZO("lzo", 0),
+  SNAPPY("snappy", 3),
+  ZSTD("zstd", 6);
+
+  private static final Map<String, CompressionCodec>
+      NAME_TO_COMPRESSION_CODEC_MAP = createNameToCompressionCodecMap();
+  private static final Map<Integer, CompressionCodec>
+      ID_TO_COMPRESSION_CODEC_MAP = createIdToCompressionCodecMap();
 
   private final String name;
+  // CompressionCodec ID to be stored in HFile on storage
+  // The ID of each codec cannot change or else that breaks all existing 
HFiles out there
+  // even the ones that are not compressed! (They use the NONE algorithm)
+  private final int id;
 
-  CompressionCodec(final String name) {
+  CompressionCodec(final String name, int id) {
     this.name = name;
+    this.id = id;
   }
 
   public String getName() {
     return name;
   }
+
+  public int getId() {
+    return id;
+  }
+
+  public static CompressionCodec findCodecByName(String name) {
+    CompressionCodec codec =
+        NAME_TO_COMPRESSION_CODEC_MAP.get(name.toLowerCase());
+    ValidationUtils.checkArgument(
+        codec != null, String.format("Cannot find compression codec: %s", 
name));
+    return codec;
+  }
+
+  /**
+   * Gets the compression codec based on the ID.  This ID is written to the 
HFile on storage.
+   *
+   * @param id ID indicating the compression codec
+   * @return compression codec based on the ID
+   */
+  public static CompressionCodec decodeCompressionCodec(int id) {
+    CompressionCodec codec = ID_TO_COMPRESSION_CODEC_MAP.get(id);
+    ValidationUtils.checkArgument(
+        codec != null, "Compression code not found for ID: " + id);
+    return codec;
+  }
+
+  /**
+   * @return the mapping of name to compression codec.
+   */
+  private static Map<String, CompressionCodec> 
createNameToCompressionCodecMap() {
+    return Collections.unmodifiableMap(
+        Arrays.stream(CompressionCodec.values())
+            .collect(Collectors.toMap(CompressionCodec::getName, 
Function.identity()))
+    );
+  }
+
+  /**
+   * @return the mapping of ID to compression codec.
+   */
+  private static Map<Integer, CompressionCodec> 
createIdToCompressionCodecMap() {
+    return Collections.unmodifiableMap(
+        Arrays.stream(CompressionCodec.values())
+            .collect(Collectors.toMap(CompressionCodec::getId, 
Function.identity()))
+    );
+  }
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressor.java 
b/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressor.java
similarity index 70%
rename from 
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressor.java
rename to 
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressor.java
index 62be2747003..5ad0f71d12c 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressor.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressor.java
@@ -21,11 +21,12 @@ package org.apache.hudi.io.compress;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 /**
- * Provides decompression on input data.
+ * Provides compression and decompression on input data.
  */
-public interface HoodieDecompressor {
+public interface HoodieCompressor {
   /**
    * Decompresses the data from {@link InputStream} and writes the 
decompressed data to the target
    * byte array.
@@ -41,4 +42,22 @@ public interface HoodieDecompressor {
                  byte[] targetByteArray,
                  int offset,
                  int length) throws IOException;
+
+  /**
+   * Compresses data stored in byte array.
+   *
+   * @param uncompressedBytes  input data in byte array
+   * @return output data in byte array
+   * @throws IOException       upon error
+   */
+  byte[] compress(byte[] uncompressedBytes) throws IOException;
+
+  /**
+   * Compresses data stored in {@link ByteBuffer}.
+   *
+   * @param uncompressedBytes  input data in {@link ByteBuffer}
+   * @return output data in {@link ByteBuffer}
+   * @throws IOException       upon error
+   */
+  ByteBuffer compress(ByteBuffer uncompressedBytes) throws IOException;
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressorFactory.java
 
b/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressorFactory.java
similarity index 65%
rename from 
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressorFactory.java
rename to 
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressorFactory.java
index af50b094079..9d56e69d705 100644
--- 
a/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressorFactory.java
+++ 
b/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressorFactory.java
@@ -19,22 +19,25 @@
 
 package org.apache.hudi.io.compress;
 
-import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipDecompressor;
-import org.apache.hudi.io.compress.builtin.HoodieNoneDecompressor;
+import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipCompressor;
+import org.apache.hudi.io.compress.builtin.HoodieNoneCompressor;
 
 /**
- * Factory for {@link HoodieDecompressor}.
+ * Factory for {@link HoodieCompressor}.
  */
-public class HoodieDecompressorFactory {
-  public static HoodieDecompressor getDecompressor(CompressionCodec 
compressionCodec) {
+public class HoodieCompressorFactory {
+  private HoodieCompressorFactory() {
+  }
+
+  public static HoodieCompressor getCompressor(CompressionCodec 
compressionCodec) {
     switch (compressionCodec) {
       case NONE:
-        return new HoodieNoneDecompressor();
+        return new HoodieNoneCompressor();
       case GZIP:
-        return new HoodieAirliftGzipDecompressor();
+        return new HoodieAirliftGzipCompressor();
       default:
         throw new IllegalArgumentException(
-            "The decompression is not supported for compression codec: " + 
compressionCodec);
+            "The compressor is not supported for compression codec: " + 
compressionCodec);
     }
   }
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipDecompressor.java
 
b/hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipCompressor.java
similarity index 63%
rename from 
hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipDecompressor.java
rename to 
hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipCompressor.java
index 15c2ff3f827..2216f581477 100644
--- 
a/hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipDecompressor.java
+++ 
b/hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipCompressor.java
@@ -20,24 +20,27 @@
 package org.apache.hudi.io.compress.airlift;
 
 import org.apache.hudi.io.compress.CompressionCodec;
-import org.apache.hudi.io.compress.HoodieDecompressor;
+import org.apache.hudi.io.compress.HoodieCompressor;
 
 import io.airlift.compress.gzip.JdkGzipHadoopStreams;
 import io.airlift.compress.hadoop.HadoopInputStream;
+import io.airlift.compress.hadoop.HadoopOutputStream;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import static org.apache.hudi.io.util.IOUtils.readFully;
 
 /**
- * Implementation of {@link HoodieDecompressor} for {@link 
CompressionCodec#GZIP} compression
+ * Implementation of {@link HoodieCompressor} for {@link 
CompressionCodec#GZIP} compression
  * codec using airlift aircompressor's GZIP decompressor.
  */
-public class HoodieAirliftGzipDecompressor implements HoodieDecompressor {
+public class HoodieAirliftGzipCompressor implements HoodieCompressor {
   private final JdkGzipHadoopStreams gzipStreams;
 
-  public HoodieAirliftGzipDecompressor() {
+  public HoodieAirliftGzipCompressor() {
     gzipStreams = new JdkGzipHadoopStreams();
   }
 
@@ -50,4 +53,20 @@ public class HoodieAirliftGzipDecompressor implements 
HoodieDecompressor {
       return readFully(stream, targetByteArray, offset, length);
     }
   }
+
+  @Override
+  public byte[] compress(byte[] data) throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    try (HadoopOutputStream gzipOutputStream = 
gzipStreams.createOutputStream(byteArrayOutputStream)) {
+      gzipOutputStream.write(data);
+    }
+    return byteArrayOutputStream.toByteArray();
+  }
+
+  @Override
+  public ByteBuffer compress(ByteBuffer uncompressedBytes) throws IOException {
+    byte[] temp = new byte[uncompressedBytes.remaining()];
+    uncompressedBytes.get(temp);
+    return ByteBuffer.wrap(this.compress(temp));
+  }
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneDecompressor.java
 
b/hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneCompressor.java
similarity index 74%
rename from 
hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneDecompressor.java
rename to 
hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneCompressor.java
index d702201c6dd..a402d29bb52 100644
--- 
a/hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneDecompressor.java
+++ 
b/hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneCompressor.java
@@ -20,18 +20,19 @@
 package org.apache.hudi.io.compress.builtin;
 
 import org.apache.hudi.io.compress.CompressionCodec;
-import org.apache.hudi.io.compress.HoodieDecompressor;
+import org.apache.hudi.io.compress.HoodieCompressor;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import static org.apache.hudi.io.util.IOUtils.readFully;
 
 /**
- * Implementation of {@link HoodieDecompressor} for {@link 
CompressionCodec#NONE} compression
+ * Implementation of {@link HoodieCompressor} for {@link 
CompressionCodec#NONE} compression
  * codec (no compression) by directly reading the input stream.
  */
-public class HoodieNoneDecompressor implements HoodieDecompressor {
+public class HoodieNoneCompressor implements HoodieCompressor {
   @Override
   public int decompress(InputStream compressedInput,
                         byte[] targetByteArray,
@@ -39,4 +40,14 @@ public class HoodieNoneDecompressor implements 
HoodieDecompressor {
                         int length) throws IOException {
     return readFully(compressedInput, targetByteArray, offset, length);
   }
+
+  @Override
+  public byte[] compress(byte[] uncompressedBytes) {
+    return uncompressedBytes;
+  }
+
+  @Override
+  public ByteBuffer compress(ByteBuffer uncompressedBytes) throws IOException {
+    return uncompressedBytes;
+  }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/ChecksumType.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/ChecksumType.java
new file mode 100644
index 00000000000..1642370f161
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/ChecksumType.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+/**
+ * Type of checksum used to validate the integrity of data block.
+ * It determines the number of bytes used for checksum.
+ */
+public enum ChecksumType {
+
+  NULL((byte) 0) {
+    @Override
+    public String getName() {
+      return "NULL";
+    }
+  },
+
+  CRC32((byte) 1) {
+    @Override
+    public String getName() {
+      return "CRC32";
+    }
+  },
+
+  CRC32C((byte) 2) {
+    @Override
+    public String getName() {
+      return "CRC32C";
+    }
+  };
+
+  private final byte code;
+
+  public static ChecksumType getDefaultChecksumType() {
+    return ChecksumType.CRC32C;
+  }
+
+  /** returns the name of this checksum type */
+  public abstract String getName();
+
+  private ChecksumType(final byte c) {
+    this.code = c;
+  }
+
+  public byte getCode() {
+    return this.code;
+  }
+
+  /**
+   * Use designated byte value to indicate checksum type.
+   * @return type associated with passed code
+   */
+  public static ChecksumType codeToType(final byte b) {
+    for (ChecksumType t : ChecksumType.values()) {
+      if (t.getCode() == b) {
+        return t;
+      }
+    }
+    throw new RuntimeException("Unknown checksum type code " + b);
+  }
+
+  /**
+   * Map a checksum name to a specific type.
+   * @return type associated with the name
+   */
+  public static ChecksumType nameToType(final String name) {
+    for (ChecksumType t : ChecksumType.values()) {
+      if (t.getName().equals(name)) {
+        return t;
+      }
+    }
+    throw new RuntimeException("Unknown checksum type name " + name);
+  }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java
index 1723b482a95..14290362e48 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java
@@ -19,11 +19,16 @@
 
 package org.apache.hudi.io.hfile;
 
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.compress.CompressionCodec;
 
+import com.google.protobuf.CodedOutputStream;
+
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import static org.apache.hudi.io.hfile.DataSize.MAGIC_LENGTH;
 import static org.apache.hudi.io.hfile.DataSize.SIZEOF_BYTE;
@@ -43,9 +48,10 @@ public abstract class HFileBlock {
   // followed by another 4 byte value to store sizeofDataOnDisk.
   public static final int HFILEBLOCK_HEADER_SIZE =
       HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + SIZEOF_BYTE + 2 * SIZEOF_INT32;
-
   // Each checksum value is an integer that can be stored in 4 bytes.
   static final int CHECKSUM_SIZE = SIZEOF_INT32;
+  private static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
+  private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
 
   static class Header {
     // Format of header is:
@@ -80,6 +86,13 @@ public abstract class HFileBlock {
   protected byte[] compressedByteBuff;
   protected int startOffsetInCompressedBuff;
 
+  // Write properties
+  private long startOffsetInBuffForWrite = -1;
+  private long previousBlockOffsetForWrite = -1;
+
+  /**
+   * Initialize HFileBlock for read.
+   */
   protected HFileBlock(HFileContext context,
                        HFileBlockType blockType,
                        byte[] byteBuff,
@@ -107,6 +120,25 @@ public abstract class HFileBlock {
         this.startOffsetInBuff + HFILEBLOCK_HEADER_SIZE + 
uncompressedSizeWithoutHeader;
   }
 
+  /**
+   * Initialize HFileBlock for write.
+   */
+  protected HFileBlock(HFileContext context,
+                       HFileBlockType blockType,
+                       long previousBlockOffsetForWrite) {
+    this.context = context;
+    this.blockType = blockType;
+    this.previousBlockOffsetForWrite = previousBlockOffsetForWrite;
+    // Set other reader properties to invalid values
+    this.byteBuff = null;
+    this.startOffsetInBuff = -1;
+    this.sizeCheckSum = -1;
+    this.uncompressedEndOffset = -1;
+    this.onDiskSizeWithoutHeader = -1;
+    this.uncompressedSizeWithoutHeader = -1;
+    this.bytesPerChecksum = -1;
+  }
+
   /**
    * Parses the HFile block header and returns the {@link HFileBlock} instance 
based on the input.
    *
@@ -196,7 +228,7 @@ public abstract class HFileBlock {
             compressedByteBuff, startOffsetInCompressedBuff, byteBuff, 0, 
HFILEBLOCK_HEADER_SIZE);
         try (InputStream byteBuffInputStream = new ByteArrayInputStream(
             compressedByteBuff, startOffsetInCompressedBuff + 
HFILEBLOCK_HEADER_SIZE, onDiskSizeWithoutHeader)) {
-          context.getDecompressor().decompress(
+          context.getCompressor().decompress(
               byteBuffInputStream,
               byteBuff,
               HFILEBLOCK_HEADER_SIZE,
@@ -217,4 +249,91 @@ public abstract class HFileBlock {
     int capacity = HFILEBLOCK_HEADER_SIZE + uncompressedSizeWithoutHeader + 
sizeCheckSum;
     return new byte[capacity];
   }
+
+  // ================ Below are for Write ================
+
+  /**
+   * Returns serialized "data" part of the block.
+   * This function must be implemented by each block type separately.
+   */
+  protected abstract ByteBuffer getUncompressedBlockDataToWrite();
+
+  /**
+   * Return serialized block including header, data, checksum.
+   */
+  public ByteBuffer serialize() throws IOException {
+    // Block payload.
+    ByteBuffer uncompressedBlockData = getUncompressedBlockDataToWrite();
+    // Compress if specified.
+    ByteBuffer compressedBlockData = 
context.getCompressor().compress(uncompressedBlockData);
+    // Buffer for building block.
+    ByteBuffer buf = ByteBuffer.allocate(Math.max(
+        context.getBlockSize() * 2,
+        compressedBlockData.limit() + HFILEBLOCK_HEADER_SIZE * 2));
+
+    // Block header
+    // 1. Magic is always 8 bytes.
+    buf.put(blockType.getMagic(), 0, 8);
+    // 2. onDiskSizeWithoutHeader.
+    buf.putInt(compressedBlockData.limit());
+    // 3. uncompressedSizeWithoutHeader.
+    buf.putInt(uncompressedBlockData.limit());
+    // 4. Previous block offset.
+    buf.putLong(previousBlockOffsetForWrite);
+    // 5. Checksum type.
+    buf.put(context.getChecksumType().getCode());
+    // 6. Bytes covered per checksum.
+    buf.putInt(DEFAULT_BYTES_PER_CHECKSUM);
+    // 7. onDiskDataSizeWithHeader
+    int onDiskDataSizeWithHeader =
+        HFileBlock.HFILEBLOCK_HEADER_SIZE + compressedBlockData.limit();
+    buf.putInt(onDiskDataSizeWithHeader);
+    // 8. Payload.
+    buf.put(compressedBlockData);
+    // 9. Checksum.
+    buf.put(generateChecksumBytes(context.getChecksumType()));
+
+    // Update sizes
+    buf.flip();
+    return buf;
+  }
+
+  /**
+   * Sets start offset of the block in the buffer.
+   */
+  protected void setStartOffsetInBuffForWrite(long startOffsetInBuffForWrite) {
+    this.startOffsetInBuffForWrite = startOffsetInBuffForWrite;
+  }
+
+  /**
+   * Gets start offset of the block in the buffer.
+   */
+  protected long getStartOffsetInBuffForWrite() {
+    return this.startOffsetInBuffForWrite;
+  }
+
+  /**
+   * Returns checksum bytes if checksum type is not NULL.
+   * Note that current HFileReaderImpl does not support non-NULL checksum.
+   */
+  private byte[] generateChecksumBytes(ChecksumType type) {
+    if (type == ChecksumType.NULL) {
+      return EMPTY_BYTE_ARRAY;
+    }
+    throw new HoodieException("Only NULL checksum type is supported");
+  }
+
+  /**
+   * Returns the bytes of the variable length encoding for an integer.
+   * @param length       an integer, normally representing a length.
+   * @return             variable length encoding.
+   * @throws IOException upon error.
+   */
+  static byte[] getVariableLengthEncodedBytes(int length) throws IOException {
+    ByteArrayOutputStream varintBuffer = new ByteArrayOutputStream();
+    CodedOutputStream varintOutput = 
CodedOutputStream.newInstance(varintBuffer);
+    varintOutput.writeUInt32NoTag(length);
+    varintOutput.flush();
+    return varintBuffer.toByteArray();
+  }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockType.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockType.java
index 284988cc4cd..ee800a1152c 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockType.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockType.java
@@ -168,4 +168,8 @@ public enum HFileBlockType {
           + new String(magic) + ", got " + new String(buf));
     }
   }
+
+  public byte[] getMagic() {
+    return magic;
+  }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileContext.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileContext.java
index d47daef30ec..d2864e21ac5 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileContext.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileContext.java
@@ -20,27 +20,39 @@
 package org.apache.hudi.io.hfile;
 
 import org.apache.hudi.io.compress.CompressionCodec;
-import org.apache.hudi.io.compress.HoodieDecompressor;
-import org.apache.hudi.io.compress.HoodieDecompressorFactory;
+import org.apache.hudi.io.compress.HoodieCompressor;
+import org.apache.hudi.io.compress.HoodieCompressorFactory;
 
 /**
  * The context of HFile that contains information of the blocks.
  */
 public class HFileContext {
   private final CompressionCodec compressionCodec;
-  private final HoodieDecompressor decompressor;
+  private final HoodieCompressor compressor;
+  private final ChecksumType checksumType;
+  private final int blockSize;
 
-  private HFileContext(CompressionCodec compressionCodec) {
+  private HFileContext(CompressionCodec compressionCodec, int blockSize, 
ChecksumType checksumType) {
     this.compressionCodec = compressionCodec;
-    this.decompressor = 
HoodieDecompressorFactory.getDecompressor(compressionCodec);
+    this.compressor = HoodieCompressorFactory.getCompressor(compressionCodec);
+    this.blockSize = blockSize;
+    this.checksumType = checksumType;
   }
 
   CompressionCodec getCompressionCodec() {
     return compressionCodec;
   }
 
-  HoodieDecompressor getDecompressor() {
-    return decompressor;
+  HoodieCompressor getCompressor() {
+    return compressor;
+  }
+
+  int getBlockSize() {
+    return blockSize;
+  }
+
+  ChecksumType getChecksumType() {
+    return checksumType;
   }
 
   public static Builder builder() {
@@ -49,8 +61,12 @@ public class HFileContext {
 
   public static class Builder {
     private CompressionCodec compressionCodec = CompressionCodec.NONE;
+    private int blockSize = 1024 * 1024;
+    private ChecksumType checksumType = ChecksumType.NULL;
 
-    public Builder() {
+    public Builder blockSize(int blockSize) {
+      this.blockSize = blockSize;
+      return this;
     }
 
     public Builder compressionCodec(CompressionCodec compressionCodec) {
@@ -58,8 +74,13 @@ public class HFileContext {
       return this;
     }
 
+    public Builder checksumType(ChecksumType checksumType) {
+      this.checksumType = checksumType;
+      return this;
+    }
+
     public HFileContext build() {
-      return new HFileContext(compressionCodec);
+      return new HFileContext(compressionCodec, blockSize, checksumType);
     }
   }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java
index b7b152d4773..7f62b1c5416 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java
@@ -21,6 +21,12 @@ package org.apache.hudi.io.hfile;
 
 import org.apache.hudi.common.util.Option;
 
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hudi.io.hfile.DataSize.SIZEOF_INT16;
 import static 
org.apache.hudi.io.hfile.HFileReader.SEEK_TO_BEFORE_BLOCK_FIRST_KEY;
 import static org.apache.hudi.io.hfile.HFileReader.SEEK_TO_FOUND;
 import static org.apache.hudi.io.hfile.HFileReader.SEEK_TO_IN_RANGE;
@@ -38,7 +44,9 @@ public class HFileDataBlock extends HFileBlock {
 
   // End offset of content in the block, relative to the start of the start of 
the block
   protected final int uncompressedContentEndRelativeOffset;
+  private final List<KeyValueEntry> entriesToWrite = new ArrayList<>();
 
+  // For read purpose.
   protected HFileDataBlock(HFileContext context,
                            byte[] byteBuff,
                            int startOffsetInBuff) {
@@ -48,6 +56,18 @@ public class HFileDataBlock extends HFileBlock {
         this.uncompressedEndOffset - this.sizeCheckSum - 
this.startOffsetInBuff;
   }
 
+  // For write purpose.
+  private HFileDataBlock(HFileContext context, long previousBlockOffset) {
+    super(context, HFileBlockType.DATA, previousBlockOffset);
+    // This is not used for write.
+    uncompressedContentEndRelativeOffset = -1;
+  }
+
+  static HFileDataBlock createDataBlockToWrite(HFileContext context,
+                                               long previousBlockOffset) {
+    return new HFileDataBlock(context, previousBlockOffset);
+  }
+
   /**
    * Seeks to the key to look up. The key may not have an exact match.
    *
@@ -70,7 +90,7 @@ public class HFileDataBlock extends HFileBlock {
    * of the data block; the cursor points to the actual first key of the data 
block which is
    * lexicographically greater than the lookup key.
    */
-  public int seekTo(HFileCursor cursor, Key key, int blockStartOffsetInFile) {
+  int seekTo(HFileCursor cursor, Key key, int blockStartOffsetInFile) {
     int relativeOffset = cursor.getOffset() - blockStartOffsetInFile;
     int lastRelativeOffset = relativeOffset;
     Option<KeyValue> lastKeyValue = cursor.getKeyValue();
@@ -123,7 +143,7 @@ public class HFileDataBlock extends HFileBlock {
    * @param offset offset to read relative to the start of {@code byteBuff}.
    * @return the {@link KeyValue} instance.
    */
-  public KeyValue readKeyValue(int offset) {
+  KeyValue readKeyValue(int offset) {
     return new KeyValue(byteBuff, offset);
   }
 
@@ -135,7 +155,7 @@ public class HFileDataBlock extends HFileBlock {
    *                               HFile.
    * @return {@code true} if there is next {@link KeyValue}; {code false} 
otherwise.
    */
-  public boolean next(HFileCursor cursor, int blockStartOffsetInFile) {
+  boolean next(HFileCursor cursor, int blockStartOffsetInFile) {
     int offset = cursor.getOffset() - blockStartOffsetInFile;
     Option<KeyValue> keyValue = cursor.getKeyValue();
     if (!keyValue.isPresent()) {
@@ -149,4 +169,56 @@ public class HFileDataBlock extends HFileBlock {
   private boolean isAtFirstKey(int relativeOffset) {
     return relativeOffset == HFILEBLOCK_HEADER_SIZE;
   }
+
+  // ================ Below are for Write ================
+
+  boolean isEmpty() {
+    return entriesToWrite.isEmpty();
+  }
+
+  void add(byte[] key, byte[] value) {
+    KeyValueEntry kv = new KeyValueEntry(key, value);
+    // Assume all entries are sorted before write.
+    entriesToWrite.add(kv);
+  }
+
+  int getNumOfEntries() {
+    return entriesToWrite.size();
+  }
+
+  byte[] getFirstKey() {
+    return entriesToWrite.get(0).key;
+  }
+
+  byte[] getLastKeyContent() {
+    if (entriesToWrite.isEmpty()) {
+      return new byte[0];
+    }
+    return entriesToWrite.get(entriesToWrite.size() - 1).key;
+  }
+
+  @Override
+  protected ByteBuffer getUncompressedBlockDataToWrite() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ByteBuffer dataBuf = ByteBuffer.allocate(context.getBlockSize());
+    for (KeyValueEntry kv : entriesToWrite) {
+      // Length of key + length of a short variable indicating length of key.
+      dataBuf.putInt(kv.key.length + SIZEOF_INT16);
+      // Length of value.
+      dataBuf.putInt(kv.value.length);
+      // Key content length.
+      dataBuf.putShort((short)kv.key.length);
+      // Key.
+      dataBuf.put(kv.key);
+      // Value.
+      dataBuf.put(kv.value);
+      // MVCC.
+      dataBuf.put((byte)0);
+      // Copy to output stream.
+      baos.write(dataBuf.array(), 0, dataBuf.position());
+      // Clear the buffer.
+      dataBuf.clear();
+    }
+    return ByteBuffer.wrap(baos.toByteArray());
+  }
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileFileInfoBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileFileInfoBlock.java
index e0b93201924..2f9918f6d07 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileFileInfoBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileFileInfoBlock.java
@@ -22,12 +22,16 @@ package org.apache.hudi.io.hfile;
 import org.apache.hudi.io.hfile.protobuf.generated.HFileProtos;
 import org.apache.hudi.io.util.IOUtils;
 
+import com.google.protobuf.ByteString;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 /**
  * Represents a {@link HFileBlockType#FILE_INFO} block.
@@ -35,6 +39,8 @@ import static 
org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
 public class HFileFileInfoBlock extends HFileBlock {
   // Magic we put ahead of a serialized protobuf message
   public static final byte[] PB_MAGIC = new byte[] {'P', 'B', 'U', 'F'};
+  // Write properties
+  private final Map<String, byte[]> fileInfoToWrite = new HashMap<>();
 
   public HFileFileInfoBlock(HFileContext context,
                             byte[] byteBuff,
@@ -42,6 +48,14 @@ public class HFileFileInfoBlock extends HFileBlock {
     super(context, HFileBlockType.FILE_INFO, byteBuff, startOffsetInBuff);
   }
 
+  private HFileFileInfoBlock(HFileContext context) {
+    super(context, HFileBlockType.FILE_INFO, -1L);
+  }
+
+  public static HFileFileInfoBlock createFileInfoBlockToWrite(HFileContext 
context) {
+    return new HFileFileInfoBlock(context);
+  }
+
   public HFileInfo readFileInfo() throws IOException {
     int pbMagicLength = PB_MAGIC.length;
     if (IOUtils.compareTo(PB_MAGIC, 0, pbMagicLength,
@@ -61,4 +75,35 @@ public class HFileFileInfoBlock extends HFileBlock {
     }
     return new HFileInfo(fileInfoMap);
   }
+
+  // ================ Below are for Write ================
+
+  public void add(String name, byte[] value) {
+    fileInfoToWrite.put(name, value);
+  }
+
+  @Override
+  public ByteBuffer getUncompressedBlockDataToWrite() {
+    ByteBuffer buff = ByteBuffer.allocate(context.getBlockSize() * 2);
+    HFileProtos.InfoProto.Builder builder =
+        HFileProtos.InfoProto.newBuilder();
+    for (Map.Entry<String, byte[]> e : fileInfoToWrite.entrySet()) {
+      HFileProtos.BytesBytesPair bbp = HFileProtos.BytesBytesPair
+          .newBuilder()
+          .setFirst(ByteString.copyFrom(getUTF8Bytes(e.getKey())))
+          .setSecond(ByteString.copyFrom(e.getValue()))
+          .build();
+      builder.addMapEntry(bbp);
+    }
+    buff.put(PB_MAGIC);
+    byte[] payload = builder.build().toByteArray();
+    try {
+      buff.put(getVariableLengthEncodedBytes(payload.length));
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to calculate File Info variable 
length");
+    }
+    buff.put(payload);
+    buff.flip();
+    return buff;
+  }
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileIndexBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileIndexBlock.java
new file mode 100644
index 00000000000..72ddd6edf21
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileIndexBlock.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class HFileIndexBlock extends HFileBlock {
+  protected final List<BlockIndexEntry> entries = new ArrayList<>();
+  protected long blockDataSize = -1L;
+
+  protected HFileIndexBlock(HFileContext context,
+                         HFileBlockType blockType,
+                         byte[] byteBuff,
+                         int startOffsetInBuff) {
+    super(context, blockType, byteBuff, startOffsetInBuff);
+  }
+
+  protected HFileIndexBlock(HFileContext context,
+                         HFileBlockType blockType) {
+    super(context, blockType, -1L);
+  }
+
+  public void add(byte[] firstKey, long offset, int size) {
+    Key key = new Key(firstKey);
+    entries.add(new BlockIndexEntry(key, Option.empty(), offset, size));
+  }
+
+  public int getNumOfEntries() {
+    return entries.size();
+  }
+
+  public boolean isEmpty() {
+    return entries.isEmpty();
+  }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java
index adc7c312936..b2c8e141675 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java
@@ -29,13 +29,13 @@ import java.util.Map;
  */
 public class HFileInfo {
   private static final String RESERVED_PREFIX = "hfile.";
-  private static final UTF8StringKey LAST_KEY =
+  static final UTF8StringKey LAST_KEY =
       new UTF8StringKey(RESERVED_PREFIX + "LASTKEY");
   private static final UTF8StringKey FILE_CREATION_TIME_TS =
       new UTF8StringKey(RESERVED_PREFIX + "CREATE_TIME_TS");
   private static final UTF8StringKey KEY_VALUE_VERSION =
       new UTF8StringKey("KEY_VALUE_VERSION");
-  private static final UTF8StringKey MAX_MVCC_TS_KEY =
+  static final UTF8StringKey MAX_MVCC_TS_KEY =
       new UTF8StringKey("MAX_MEMSTORE_TS_KEY");
 
   private static final int KEY_VALUE_VERSION_WITH_MVCC_TS = 1;
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileLeafIndexBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileLeafIndexBlock.java
index e664145bbc1..a66a85c5b9e 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileLeafIndexBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileLeafIndexBlock.java
@@ -21,7 +21,9 @@ package org.apache.hudi.io.hfile;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -47,6 +49,11 @@ public class HFileLeafIndexBlock extends HFileBlock {
     super(context, blockType, byteBuff, startOffsetInBuff);
   }
 
+  @Override
+  protected ByteBuffer getUncompressedBlockDataToWrite() {
+    throw new HoodieException("HFile writer does not support leaf index 
block");
+  }
+
   /**
    * Reads the index block and returns the block index entries.
    */
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
index 67ab0963824..84ffa70f6ff 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
@@ -25,15 +25,43 @@ import java.nio.ByteBuffer;
  * Represents a {@link HFileBlockType#META} block.
  */
 public class HFileMetaBlock extends HFileBlock {
+  protected KeyValueEntry entryToWrite;
+
   protected HFileMetaBlock(HFileContext context,
                            byte[] byteBuff,
                            int startOffsetInBuff) {
     super(context, HFileBlockType.META, byteBuff, startOffsetInBuff);
   }
 
+  private HFileMetaBlock(HFileContext context, KeyValueEntry keyValueEntry) {
+    super(context, HFileBlockType.META, -1L);
+    this.entryToWrite = keyValueEntry;
+  }
+
+  static HFileMetaBlock createMetaBlockToWrite(HFileContext context,
+                                               KeyValueEntry keyValueEntry) {
+    return new HFileMetaBlock(context, keyValueEntry);
+  }
+
   public ByteBuffer readContent() {
     return ByteBuffer.wrap(
         getByteBuff(),
         startOffsetInBuff + HFILEBLOCK_HEADER_SIZE, 
uncompressedSizeWithoutHeader);
   }
+
+  // ================ Below are for Write ================
+
+  public byte[] getFirstKey() {
+    return entryToWrite.key;
+  }
+
+  @Override
+  public ByteBuffer getUncompressedBlockDataToWrite() {
+    ByteBuffer dataBuf = ByteBuffer.allocate(context.getBlockSize());
+    // Note that: only value should be store in the block.
+    // The key is stored in the meta index block.
+    dataBuf.put(entryToWrite.value);
+    dataBuf.flip();
+    return dataBuf;
+  }
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaIndexBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaIndexBlock.java
new file mode 100644
index 00000000000..03159699149
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaIndexBlock.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class HFileMetaIndexBlock extends HFileIndexBlock {
+
+  private HFileMetaIndexBlock(HFileContext context) {
+    super(context, HFileBlockType.ROOT_INDEX);
+  }
+
+  public static HFileMetaIndexBlock createMetaIndexBlockToWrite(HFileContext 
context) {
+    return new HFileMetaIndexBlock(context);
+  }
+
+  @Override
+  public ByteBuffer getUncompressedBlockDataToWrite() {
+    ByteBuffer buf = ByteBuffer.allocate(context.getBlockSize() * 2);
+    for (BlockIndexEntry entry : entries) {
+      buf.putLong(entry.getOffset());
+      buf.putInt(entry.getSize());
+      // Key length.
+      try {
+        byte[] keyLength = 
getVariableLengthEncodedBytes(entry.getFirstKey().getLength());
+        buf.put(keyLength);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Failed to serialize number: " + entry.getFirstKey().getLength());
+      }
+      // Note that: NO two-bytes for encoding key length.
+      // Key.
+      buf.put(entry.getFirstKey().getBytes());
+    }
+    buf.flip();
+
+    // Set metrics.
+    blockDataSize = buf.limit();
+    return buf;
+  }
+}
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
index d176af6d251..b3be1e7d6db 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
@@ -264,6 +264,14 @@ public class HFileReaderImpl implements HFileReader {
     stream.close();
   }
 
+  HFileTrailer getTrailer() {
+    return trailer;
+  }
+
+  Map<Key, BlockIndexEntry> getDataBlockIndexMap() {
+    return dataBlockIndexEntryMap;
+  }
+
   /**
    * Reads and parses the HFile trailer.
    *
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java
index 099e42ad44c..19ae69a8517 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java
@@ -21,10 +21,14 @@ package org.apache.hudi.io.hfile;
 
 import org.apache.hudi.common.util.Option;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TreeMap;
 
+import static org.apache.hudi.io.hfile.DataSize.SIZEOF_INT16;
 import static org.apache.hudi.io.util.IOUtils.copy;
 import static org.apache.hudi.io.util.IOUtils.decodeVarLongSizeOnDisk;
 import static org.apache.hudi.io.util.IOUtils.readInt;
@@ -34,13 +38,21 @@ import static org.apache.hudi.io.util.IOUtils.readVarLong;
 /**
  * Represents a {@link HFileBlockType#ROOT_INDEX} block.
  */
-public class HFileRootIndexBlock extends HFileBlock {
+public class HFileRootIndexBlock extends HFileIndexBlock {
   public HFileRootIndexBlock(HFileContext context,
                              byte[] byteBuff,
                              int startOffsetInBuff) {
     super(context, HFileBlockType.ROOT_INDEX, byteBuff, startOffsetInBuff);
   }
 
+  private HFileRootIndexBlock(HFileContext context) {
+    super(context, HFileBlockType.ROOT_INDEX);
+  }
+
+  public static HFileRootIndexBlock createRootIndexBlockToWrite(HFileContext 
context) {
+    return new HFileRootIndexBlock(context);
+  }
+
   /**
    * Reads the index block and returns the block index entry to an in-memory 
{@link TreeMap}
    * for searches.
@@ -86,4 +98,37 @@ public class HFileRootIndexBlock extends HFileBlock {
     }
     return indexEntryList;
   }
+
+  @Override
+  public ByteBuffer getUncompressedBlockDataToWrite() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ByteBuffer buf = ByteBuffer.allocate(context.getBlockSize());
+    for (BlockIndexEntry entry : entries) {
+      buf.putLong(entry.getOffset());
+      buf.putInt(entry.getSize());
+
+      // Key length + 2.
+      try {
+        byte[] keyLength = getVariableLengthEncodedBytes(
+            entry.getFirstKey().getLength() + SIZEOF_INT16);
+        buf.put(keyLength);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Failed to serialize number: " + entry.getFirstKey().getLength() + 
SIZEOF_INT16);
+      }
+      // Key length.
+      buf.putShort((short) entry.getFirstKey().getLength());
+      // Key.
+      buf.put(entry.getFirstKey().getBytes());
+      // Copy to output stream.
+      baos.write(buf.array(), 0, buf.position());
+      // Clear the buffer.
+      buf.clear();
+    }
+
+    // Output all data in a buffer.
+    byte[] allData = baos.toByteArray();
+    blockDataSize = allData.length;
+    return ByteBuffer.wrap(allData);
+  }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileTrailer.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileTrailer.java
index 7aff7d2c830..474ab71a2ae 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileTrailer.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileTrailer.java
@@ -28,7 +28,6 @@ import java.util.Arrays;
 
 import static org.apache.hudi.io.hfile.DataSize.MAGIC_LENGTH;
 import static org.apache.hudi.io.hfile.DataSize.SIZEOF_INT32;
-import static org.apache.hudi.io.hfile.HFileUtils.decodeCompressionCodec;
 
 /**
  * Represents a HFile trailer, which is serialized and deserialized using
@@ -159,7 +158,7 @@ public class HFileTrailer {
       comparatorClassName = trailerProto.getComparatorClassName();
     }
     if (trailerProto.hasCompressionCodec()) {
-      compressionCodec = 
decodeCompressionCodec(trailerProto.getCompressionCodec());
+      compressionCodec = 
CompressionCodec.decodeCompressionCodec(trailerProto.getCompressionCodec());
     } else {
       compressionCodec = CompressionCodec.NONE;
     }
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java
index bd3568d0b2d..1ded713c59d 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java
@@ -19,35 +19,14 @@
 
 package org.apache.hudi.io.hfile;
 
-import org.apache.hudi.io.compress.CompressionCodec;
 import org.apache.hudi.io.util.IOUtils;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
 
 /**
  * Util methods for reading and writing HFile.
  */
 public class HFileUtils {
-  private static final Map<Integer, CompressionCodec> 
HFILE_COMPRESSION_CODEC_MAP = createCompressionCodecMap();
-
-  /**
-   * Gets the compression codec based on the ID.  This ID is written to the 
HFile on storage.
-   *
-   * @param id ID indicating the compression codec.
-   * @return compression codec based on the ID.
-   */
-  public static CompressionCodec decodeCompressionCodec(int id) {
-    CompressionCodec codec = HFILE_COMPRESSION_CODEC_MAP.get(id);
-    if (codec == null) {
-      throw new IllegalArgumentException("Compression code not found for ID: " 
+ id);
-    }
-    return codec;
-  }
-
   /**
    * Reads the HFile major version from the input.
    *
@@ -106,23 +85,4 @@ public class HFileUtils {
   public static String getValue(KeyValue kv) {
     return fromUTF8Bytes(kv.getBytes(), kv.getValueOffset(), 
kv.getValueLength());
   }
-
-  /**
-   * The ID mapping cannot change or else that breaks all existing HFiles out 
there,
-   * even the ones that are not compressed! (They use the NONE algorithm)
-   * This is because HFile stores the ID to indicate which compression codec 
is used.
-   *
-   * @return the mapping of ID to compression codec.
-   */
-  private static Map<Integer, CompressionCodec> createCompressionCodecMap() {
-    Map<Integer, CompressionCodec> result = new HashMap<>();
-    result.put(0, CompressionCodec.LZO);
-    result.put(1, CompressionCodec.GZIP);
-    result.put(2, CompressionCodec.NONE);
-    result.put(3, CompressionCodec.SNAPPY);
-    result.put(4, CompressionCodec.LZ4);
-    result.put(5, CompressionCodec.BZIP2);
-    result.put(6, CompressionCodec.ZSTD);
-    return Collections.unmodifiableMap(result);
-  }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriter.java
similarity index 59%
copy from hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
copy to hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriter.java
index 67ab0963824..617561f0b43 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriter.java
@@ -19,21 +19,27 @@
 
 package org.apache.hudi.io.hfile;
 
-import java.nio.ByteBuffer;
+import java.io.Closeable;
+import java.io.IOException;
 
 /**
- * Represents a {@link HFileBlockType#META} block.
+ * The interface for HFile writer to implement.
  */
-public class HFileMetaBlock extends HFileBlock {
-  protected HFileMetaBlock(HFileContext context,
-                           byte[] byteBuff,
-                           int startOffsetInBuff) {
-    super(context, HFileBlockType.META, byteBuff, startOffsetInBuff);
-  }
+public interface HFileWriter extends Closeable {
+  /**
+   * Append a key-value pair into a data block.
+   * The caller must guarantee that the key lexicographically increments or 
the same
+   * as the last key.
+    */
+  void append(String key, byte[] value) throws IOException;
 
-  public ByteBuffer readContent() {
-    return ByteBuffer.wrap(
-        getByteBuff(),
-        startOffsetInBuff + HFILEBLOCK_HEADER_SIZE, 
uncompressedSizeWithoutHeader);
-  }
+  /**
+   * Append a piece of file info.
+   */
+  void appendFileInfo(String name, byte[] value);
+
+  /**
+   * Append a piece of meta info.
+   */
+  void appendMetaInfo(String name, byte[] value);
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriterImpl.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriterImpl.java
new file mode 100644
index 00000000000..c8153328ca7
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriterImpl.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.io.hfile.protobuf.generated.HFileProtos;
+
+import com.google.protobuf.ByteString;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.io.hfile.DataSize.SIZEOF_INT16;
+import static 
org.apache.hudi.io.hfile.HFileBlock.getVariableLengthEncodedBytes;
+import static org.apache.hudi.io.hfile.HFileBlockType.TRAILER;
+import static org.apache.hudi.io.hfile.HFileInfo.LAST_KEY;
+import static org.apache.hudi.io.hfile.HFileInfo.MAX_MVCC_TS_KEY;
+import static org.apache.hudi.io.hfile.HFileTrailer.TRAILER_SIZE;
+
+/**
+ * Pure Java implementation of HFile writer (HFile v3 format) for Hudi.
+ */
+public class HFileWriterImpl implements HFileWriter {
+  private static final String COMPARATOR_CLASS_NAME
+      = "org.apache.hudi.io.storage.HoodieHBaseKVComparator";
+  private static final byte HFILE_VERSION = (byte) 3;
+  private final OutputStream outputStream;
+  private final HFileContext context;
+  // Meta Info map.
+  private final Map<String, byte[]> metaInfo = new HashMap<>();
+  // Data block under construction.
+  private HFileDataBlock currentDataBlock;
+  // Meta block under construction.
+  private final HFileRootIndexBlock rootIndexBlock;
+  private final HFileMetaIndexBlock metaIndexBlock;
+  private final HFileFileInfoBlock fileInfoBlock;
+  private long uncompressedDataBlockBytes;
+  private long totalUncompressedDataBlockBytes;
+  private long currentOffset;
+  private long loadOnOpenSectionOffset;
+  private final int blockSize;
+
+  // Variables used to record necessary information to reduce
+  // the memory usage.
+  private byte[] lastKey = new byte[0];
+  private long firstDataBlockOffset = -1;
+  private long lastDataBlockOffset;
+  private long totalNumberOfRecords = 0;
+
+  public HFileWriterImpl(HFileContext context, OutputStream outputStream) {
+    this.outputStream = outputStream;
+    this.context = context;
+    this.blockSize = this.context.getBlockSize();
+    this.uncompressedDataBlockBytes = 0L;
+    this.totalUncompressedDataBlockBytes = 0L;
+    this.currentOffset = 0L;
+    this.currentDataBlock = HFileDataBlock.createDataBlockToWrite(context, 
-1L);
+    this.rootIndexBlock = 
HFileRootIndexBlock.createRootIndexBlockToWrite(context);
+    this.metaIndexBlock = 
HFileMetaIndexBlock.createMetaIndexBlockToWrite(context);
+    this.fileInfoBlock = 
HFileFileInfoBlock.createFileInfoBlockToWrite(context);
+    initFileInfo();
+  }
+
+  // Append a data kv pair.
+  public void append(String key, byte[] value) throws IOException {
+    byte[] keyBytes = StringUtils.getUTF8Bytes(key);
+    lastKey = keyBytes;
+    // Records with the same key must be put into the same block.
+    // Here 9 = 4 bytes of key length + 4 bytes of value length + 1 byte MVCC.
+    if (!Arrays.equals(currentDataBlock.getLastKeyContent(), keyBytes)
+        && uncompressedDataBlockBytes + keyBytes.length + value.length + 9 > 
blockSize) {
+      flushCurrentDataBlock();
+      uncompressedDataBlockBytes = 0;
+    }
+    currentDataBlock.add(keyBytes, value);
+    int uncompressedKeyValueSize = keyBytes.length + value.length;
+    uncompressedDataBlockBytes += uncompressedKeyValueSize + 9;
+    totalUncompressedDataBlockBytes += uncompressedKeyValueSize + 9;
+  }
+
+  // Append a metadata kv pair.
+  public void appendMetaInfo(String name, byte[] value) {
+    metaInfo.put(name, value);
+  }
+
+  // Append a file info kv pair.
+  public void appendFileInfo(String name, byte[] value) {
+    fileInfoBlock.add(name, value);
+  }
+
+  @Override
+  public void close() throws IOException {
+    flushCurrentDataBlock();
+    flushMetaBlocks();
+    writeLoadOnOpenSection();
+    writeTrailer();
+    outputStream.flush();
+    outputStream.close();
+  }
+
+  private void flushCurrentDataBlock() throws IOException {
+    // 0. Skip flush if no data.
+    if (currentDataBlock.isEmpty()) {
+      return;
+    }
+    // 1. Update metrics.
+    if (firstDataBlockOffset < 0) {
+      firstDataBlockOffset = currentOffset;
+    }
+    lastDataBlockOffset = currentOffset;
+    totalNumberOfRecords += currentDataBlock.getNumOfEntries();
+    // 2. Flush data block.
+    ByteBuffer blockBuffer = currentDataBlock.serialize();
+    writeBuffer(blockBuffer);
+    // 3. Create an index entry.
+    rootIndexBlock.add(
+        currentDataBlock.getFirstKey(), lastDataBlockOffset, 
blockBuffer.limit());
+    // 4. Create a new data block.
+    currentDataBlock = HFileDataBlock.createDataBlockToWrite(context, 
currentOffset);
+  }
+
+  // NOTE that: reader assumes that every meta info piece
+  // should be a separate meta block.
+  private void flushMetaBlocks() throws IOException {
+    for (Map.Entry<String, byte[]> e : metaInfo.entrySet()) {
+      HFileMetaBlock currentMetaBlock =
+          HFileMetaBlock.createMetaBlockToWrite(
+              context, new KeyValueEntry(StringUtils.getUTF8Bytes(e.getKey()), 
e.getValue()));
+      ByteBuffer blockBuffer = currentMetaBlock.serialize();
+      long blockOffset = currentOffset;
+      currentMetaBlock.setStartOffsetInBuffForWrite(currentOffset);
+      writeBuffer(blockBuffer);
+      metaIndexBlock.add(
+          currentMetaBlock.getFirstKey(), blockOffset, blockBuffer.limit());
+    }
+  }
+
+  private void writeLoadOnOpenSection() throws IOException {
+    loadOnOpenSectionOffset = currentOffset;
+    // Write Root Data Index
+    ByteBuffer dataIndexBuffer = rootIndexBlock.serialize();
+    rootIndexBlock.setStartOffsetInBuffForWrite(currentOffset);
+    writeBuffer(dataIndexBuffer);
+    // Write Meta Data Index.
+    // Note: Even this block is empty, it has to be there
+    //  due to the behavior of the reader.
+    ByteBuffer metaIndexBuffer = metaIndexBlock.serialize();
+    metaIndexBlock.setStartOffsetInBuffForWrite(currentOffset);
+    writeBuffer(metaIndexBuffer);
+    // Write File Info.
+    fileInfoBlock.add(
+        new String(LAST_KEY.getBytes(), StandardCharsets.UTF_8),
+        addKeyLength(lastKey));
+    fileInfoBlock.setStartOffsetInBuffForWrite(currentOffset);
+    writeBuffer(fileInfoBlock.serialize());
+  }
+
+  private void writeTrailer() throws IOException {
+    HFileProtos.TrailerProto.Builder builder = 
HFileProtos.TrailerProto.newBuilder();
+    builder.setFileInfoOffset(fileInfoBlock.getStartOffsetInBuffForWrite());
+    builder.setLoadOnOpenDataOffset(loadOnOpenSectionOffset);
+    builder.setUncompressedDataIndexSize(totalUncompressedDataBlockBytes);
+    builder.setDataIndexCount(rootIndexBlock.getNumOfEntries());
+    builder.setMetaIndexCount(metaIndexBlock.getNumOfEntries());
+    builder.setEntryCount(totalNumberOfRecords);
+    // TODO(HUDI-9464): support multiple levels.
+    builder.setNumDataIndexLevels(1);
+    builder.setFirstDataBlockOffset(firstDataBlockOffset);
+    builder.setLastDataBlockOffset(lastDataBlockOffset);
+    builder.setComparatorClassName(COMPARATOR_CLASS_NAME);
+    builder.setCompressionCodec(context.getCompressionCodec().getId());
+    builder.setEncryptionKey(ByteString.EMPTY);
+    HFileProtos.TrailerProto trailerProto = builder.build();
+
+    ByteBuffer trailer = ByteBuffer.allocate(TRAILER_SIZE);
+    trailer.limit(TRAILER_SIZE);
+    trailer.put(TRAILER.getMagic());
+    
trailer.put(getVariableLengthEncodedBytes(trailerProto.getSerializedSize()));
+    trailer.put(trailerProto.toByteArray());
+    // Force trailer to have fixed length.
+    trailer.position(TRAILER_SIZE - 1);
+    trailer.put(HFILE_VERSION);
+
+    trailer.flip();
+    writeBuffer(trailer);
+  }
+
+  private void writeBuffer(ByteBuffer buffer) throws IOException {
+    // Note that: Use `write(byte[], off, len)`, instead of `write(byte[])`.
+    outputStream.write(buffer.array(), 0, buffer.limit());
+    currentOffset += buffer.limit();
+  }
+
+  private void initFileInfo() {
+    fileInfoBlock.add(
+        new String(MAX_MVCC_TS_KEY.getBytes(), StandardCharsets.UTF_8),
+        new byte[]{0});
+  }
+
+  // Note: HFileReaderImpl assumes that:
+  //   The last key should contain the content length bytes.
+  public byte[] addKeyLength(byte[] key) {
+    if (0 == key.length) {
+      return new byte[0];
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(key.length + SIZEOF_INT16);
+    byteBuffer.putShort((short) key.length);
+    byteBuffer.put(key);
+    return byteBuffer.array();
+  }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/KeyValueEntry.java
similarity index 64%
copy from hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
copy to hudi-io/src/main/java/org/apache/hudi/io/hfile/KeyValueEntry.java
index 67ab0963824..479096c882b 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/KeyValueEntry.java
@@ -22,18 +22,21 @@ package org.apache.hudi.io.hfile;
 import java.nio.ByteBuffer;
 
 /**
- * Represents a {@link HFileBlockType#META} block.
+ * This is data structure used to store the data before written
+ * into the file. By comparison, {@link KeyValue} is used for read.
  */
-public class HFileMetaBlock extends HFileBlock {
-  protected HFileMetaBlock(HFileContext context,
-                           byte[] byteBuff,
-                           int startOffsetInBuff) {
-    super(context, HFileBlockType.META, byteBuff, startOffsetInBuff);
+public class KeyValueEntry implements Comparable<KeyValueEntry> {
+  public final byte[] key;
+  public final byte[] value;
+
+  public KeyValueEntry(byte[] key, byte[] value) {
+    this.key = key;
+    this.value = value;
   }
 
-  public ByteBuffer readContent() {
-    return ByteBuffer.wrap(
-        getByteBuff(),
-        startOffsetInBuff + HFILEBLOCK_HEADER_SIZE, 
uncompressedSizeWithoutHeader);
+  @Override
+  public int compareTo(KeyValueEntry o) {
+    return ByteBuffer.wrap(this.key)
+        .compareTo(ByteBuffer.wrap(o.key));
   }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/UTF8StringKey.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/UTF8StringKey.java
index 672d1a6690a..b1d6592eea2 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/UTF8StringKey.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/UTF8StringKey.java
@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
  */
 public class UTF8StringKey extends Key {
   public UTF8StringKey(String key) {
-
     super(key.getBytes(StandardCharsets.UTF_8));
   }
 
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieDecompressor.java 
b/hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieCompressor.java
similarity index 82%
rename from 
hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieDecompressor.java
rename to 
hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieCompressor.java
index d6883ce7743..c81af25e33e 100644
--- 
a/hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieDecompressor.java
+++ 
b/hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieCompressor.java
@@ -19,25 +19,25 @@
 
 package org.apache.hudi.io.compress;
 
+import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipCompressor;
+import org.apache.hudi.io.compress.builtin.HoodieNoneCompressor;
 import org.apache.hudi.io.util.IOUtils;
 
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Random;
-import java.util.zip.GZIPOutputStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
- * Tests all implementation of {@link HoodieDecompressor}.
+ * Tests all implementation of {@link HoodieCompressor}.
  */
-public class TestHoodieDecompressor {
+public class TestHoodieCompressor {
   private static final int INPUT_LENGTH = 394850;
   private static final int[] READ_PART_SIZE_LIST =
       new int[] {1200, 30956, 204958, INPUT_LENGTH + 50};
@@ -49,7 +49,7 @@ public class TestHoodieDecompressor {
     switch (codec) {
       case NONE:
       case GZIP:
-        HoodieDecompressor decompressor = 
HoodieDecompressorFactory.getDecompressor(codec);
+        HoodieCompressor decompressor = 
HoodieCompressorFactory.getCompressor(codec);
         byte[] actualOutput = new byte[INPUT_LENGTH + 100];
         try (InputStream stream = prepareInputStream(codec)) {
           for (int sizeToRead : READ_PART_SIZE_LIST) {
@@ -65,20 +65,18 @@ public class TestHoodieDecompressor {
         break;
       default:
         assertThrows(
-            IllegalArgumentException.class, () -> 
HoodieDecompressorFactory.getDecompressor(codec));
+            IllegalArgumentException.class, () -> 
HoodieCompressorFactory.getCompressor(codec));
     }
   }
 
   private static InputStream prepareInputStream(CompressionCodec codec) throws 
IOException {
     switch (codec) {
       case NONE:
-        return new ByteArrayInputStream(INPUT_BYTES);
+        return new ByteArrayInputStream(
+            new HoodieNoneCompressor().compress(INPUT_BYTES));
       case GZIP:
-        ByteArrayOutputStream stream = new ByteArrayOutputStream();
-        try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(stream)) 
{
-          gzipOutputStream.write(INPUT_BYTES);
-        }
-        return new ByteArrayInputStream(stream.toByteArray());
+        return new ByteArrayInputStream(
+            new HoodieAirliftGzipCompressor().compress(INPUT_BYTES));
       default:
         throw new IllegalArgumentException("Not supported in tests.");
     }
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileWriter.java 
b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileWriter.java
new file mode 100644
index 00000000000..a90c582c51f
--- /dev/null
+++ b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileWriter.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+
+import static org.apache.hudi.io.hfile.HFileBlockType.DATA;
+import static org.apache.hudi.io.hfile.HFileBlockType.TRAILER;
+import static org.apache.hudi.io.hfile.HFileInfo.LAST_KEY;
+import static org.apache.hudi.io.hfile.HFileInfo.MAX_MVCC_TS_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHFileWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileWriter.class);
+  private static final String TEST_FILE = "test.hfile";
+  private static final HFileContext CONTEXT = HFileContext.builder().build();
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    Files.deleteIfExists(Paths.get(TEST_FILE));
+  }
+
+  @Test
+  void testOverflow() throws Exception {
+    // 1. Write data.
+    writeTestFile();
+    // 2. Validate file size.
+    validateHFileSize();
+    // 3. Validate file structure.
+    validateHFileStructure();
+    // 4. Validate consistency with HFileReader.
+    validateConsistencyWithHFileReader();
+    LOG.info("All validations passed!");
+  }
+
+  @Test
+  void testSameKeyLocation() throws IOException {
+    // 50 bytes for data part limit.
+    HFileContext context = new HFileContext.Builder().blockSize(50).build();
+    String testFile = TEST_FILE;
+    try (DataOutputStream outputStream =
+             new DataOutputStream(Files.newOutputStream(Paths.get(testFile)));
+        HFileWriter writer = new HFileWriterImpl(context, outputStream)) {
+      for (int i = 0; i < 10; i++) {
+        writer.append("key00", String.format("value%02d", i).getBytes());
+      }
+      for (int i = 1; i < 11; i++) {
+        writer.append(
+            String.format("key%02d", i),
+            String.format("value%02d", i).getBytes());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Validate.
+    try (FileChannel channel = FileChannel.open(Paths.get(testFile), 
StandardOpenOption.READ)) {
+      ByteBuffer buf = channel.map(FileChannel.MapMode.READ_ONLY, 0, 
channel.size());
+      SeekableDataInputStream inputStream =
+          new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(buf));
+      HFileReaderImpl reader = new HFileReaderImpl(inputStream, 
channel.size());
+      reader.initializeMetadata();
+      assertEquals(20, reader.getNumKeyValueEntries());
+      HFileTrailer trailer = reader.getTrailer();
+      assertEquals(6, trailer.getDataIndexCount());
+      int i = 0;
+      for (Key key : reader.getDataBlockIndexMap().keySet()) {
+        assertArrayEquals(
+            String.format("key%02d", i).getBytes(),
+            key.getContentInString().getBytes());
+        if (i == 0) {
+          i++;
+        } else {
+          i += 2;
+        }
+      }
+    }
+  }
+
+  @Test
+  void testUniqueKeyLocation() throws IOException {
+    // 50 bytes for data part limit.
+    HFileContext context = new HFileContext.Builder().blockSize(50).build();
+    String testFile = TEST_FILE;
+    try (DataOutputStream outputStream =
+             new DataOutputStream(Files.newOutputStream(Paths.get(testFile)));
+         HFileWriter writer = new HFileWriterImpl(context, outputStream)) {
+      for (int i = 0; i < 50; i++) {
+        writer.append(
+            String.format("key%02d", i), String.format("value%02d", 
i).getBytes());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Validate.
+    try (FileChannel channel = FileChannel.open(Paths.get(testFile), 
StandardOpenOption.READ)) {
+      ByteBuffer buf = channel.map(FileChannel.MapMode.READ_ONLY, 0, 
channel.size());
+      SeekableDataInputStream inputStream =
+          new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(buf));
+      HFileReaderImpl reader = new HFileReaderImpl(inputStream, 
channel.size());
+      reader.initializeMetadata();
+      assertEquals(50, reader.getNumKeyValueEntries());
+      HFileTrailer trailer = reader.getTrailer();
+      assertEquals(25, trailer.getDataIndexCount());
+      reader.seekTo();
+      for (int i = 0; i < 50; i++) {
+        KeyValue kv = reader.getKeyValue().get();
+        System.out.println(kv.getKey().getContentInString());
+        assertArrayEquals(
+            String.format("key%02d", i).getBytes(),
+            kv.getKey().getContentInString().getBytes());
+        assertArrayEquals(
+            String.format("value%02d", i).getBytes(),
+            Arrays.copyOfRange(
+                kv.getBytes(),
+                kv.getValueOffset(),
+                kv.getValueOffset() + kv.getValueLength())
+        );
+        reader.next();
+      }
+    }
+  }
+
+  private static void writeTestFile() throws Exception {
+    try (
+        DataOutputStream outputStream =
+             new DataOutputStream(Files.newOutputStream(Paths.get(TEST_FILE)));
+        HFileWriter writer = new HFileWriterImpl(CONTEXT, outputStream)) {
+      writer.append("key1", "value1".getBytes());
+      writer.append("key2", "value2".getBytes());
+      writer.append("key3", "value3".getBytes());
+    }
+  }
+
+  private static void validateHFileSize() throws IOException {
+    Path path = Paths.get(TEST_FILE);
+    long actualSize = Files.size(path);
+    long expectedSize = 4366L;
+    assertEquals(expectedSize, actualSize);
+  }
+
+  private static void validateHFileStructure() throws IOException {
+    ByteBuffer fileBuffer = mapFileToBuffer();
+
+    // 1. Validate Trailer
+    validateTrailer(fileBuffer);
+
+    // 2. Validate Data block.
+    validateDataBlocks(fileBuffer);
+  }
+
+  private static void validateConsistencyWithHFileReader() throws IOException {
+    ByteBuffer content = mapFileToBuffer();
+    try (HFileReader reader = new HFileReaderImpl(
+        new ByteArraySeekableDataInputStream(
+            new ByteBufferBackedInputStream(content)), content.limit())) {
+      reader.initializeMetadata();
+      assertEquals(3, reader.getNumKeyValueEntries());
+      assertTrue(reader.getMetaInfo(LAST_KEY).isPresent());
+      assertEquals(1, reader.getMetaInfo(MAX_MVCC_TS_KEY).get().length);
+    }
+  }
+
+  private static ByteBuffer mapFileToBuffer() throws IOException {
+    try (FileChannel channel = FileChannel.open(Paths.get(TEST_FILE), 
StandardOpenOption.READ)) {
+      return channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
+    }
+  }
+
+  private static void validateTrailer(ByteBuffer buf) {
+    int trailerStart = Math.max(0, buf.limit() - 4096);
+    buf.position(trailerStart);
+
+    // Verify magic
+    byte[] trailerMagic = new byte[8];
+    buf.get(trailerMagic);
+    assertArrayEquals(TRAILER.getMagic(), trailerMagic);
+
+    // Verify version (last 4 bytes of trailer)
+    buf.position(trailerStart + 4096 - 4);
+    byte[] versionBytes = new byte[4];
+    buf.get(versionBytes);
+    int version = ByteBuffer.wrap(versionBytes).getInt();
+    assertEquals(3, version);
+  }
+
+  private static void validateDataBlocks(ByteBuffer buf) {
+    // Point to the first data block.
+    buf.position(0);
+
+    // Validate magic.
+    byte[] dataBlockMagic = new byte[8];
+    buf.get(dataBlockMagic);
+    assertArrayEquals(DATA.getMagic(), dataBlockMagic);
+
+    // Skip header.
+    buf.position(buf.position() + 25);
+
+    // Validate data.
+    validateKeyValue(buf, "key1", "value1");
+    validateKeyValue(buf, "key2", "value2");
+    validateKeyValue(buf, "key3", "value3");
+  }
+
+  private static void validateKeyValue(ByteBuffer buf, String expectedKey, 
String expectedValue) {
+    int keyLen = buf.getInt();
+    int valLen = buf.getInt();
+
+    byte[] key = new byte[keyLen];
+    buf.get(key);
+    byte[] keyContent = Arrays.copyOfRange(key, 2, key.length);
+    assertArrayEquals(expectedKey.getBytes(StandardCharsets.UTF_8), 
keyContent);
+
+    byte[] value = new byte[valLen];
+    buf.get(value);
+    assertArrayEquals(expectedValue.getBytes(StandardCharsets.UTF_8), value);
+
+    buf.get(); // Skip MVCC timestamp
+  }
+
+  private static void assertArrayEquals(byte[] expected, byte[] actual) {
+    if (!Arrays.equals(expected, actual)) {
+      throw new AssertionError("Byte array mismatch");
+    }
+  }
+}

Reply via email to