This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 163a15a3c899 refactor: Add Lombok Builder annotation to 
HoodieLogFormat (#17785)
163a15a3c899 is described below

commit 163a15a3c899130745071396171fe451e6e62ab1
Author: voonhous <[email protected]>
AuthorDate: Wed Jun 3 20:06:12 2026 +0800

    refactor: Add Lombok Builder annotation to HoodieLogFormat (#17785)
    
    * refactor: Add Lombok Builder annotation to HoodieLogFormat
    
    * Addressed comments
    
    * Addressed comments
    
    - Restore fixed DEFAULT_SIZE_THRESHOLD as the HDFS block size in 
createNewFile, decoupling it from the user-configurable log rollover threshold
    - Fix stale 'file len' comment -> 'file size' and drop redundant '= 0L' 
initializer on Writer.fileSize
---
 .../cli/commands/TestHoodieLogFileCommand.java     |  34 +-
 .../timeline/versioning/v1/TimelineArchiverV1.java |  11 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   4 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  13 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   7 +-
 .../table/action/rollback/RollbackHelperV1.java    |   7 +-
 .../utils/TestLegacyArchivedMetaEntryReader.java   |  12 +-
 .../hudi/testutils/HoodieWriteableTestTable.java   |  12 +-
 .../testutils/HoodieFlinkWriteableTestTable.java   |  14 +-
 .../hudi/common/table/log/HoodieLogFormat.java     | 323 +++++++----------
 .../common/table/log/HoodieLogFormatWriter.java    | 114 +++---
 .../common/functional/TestHoodieLogFormat.java     | 402 ++++++++++++++-------
 .../TestHoodieLogFormatAppendFailure.java          |  22 +-
 .../hudi/common/table/TestTableSchemaResolver.java |  10 +-
 ....java => TestHoodieLogFormatWriterBuilder.java} |  13 +-
 .../table/timeline/TestArchivedTimelineV1.java     |  20 +-
 .../common/testutils/HoodieCommonTestHarness.java  |  14 +-
 .../testutils/reader/HoodieFileSliceTestUtils.java |   7 +-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  25 +-
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java |   7 +-
 .../procedure/TestShowTimelineTableProcedure.scala |   8 +-
 .../apache/hudi/hive/testutils/HiveTestUtil.java   |  18 +-
 .../TestHoodieMetadataTableValidator.java          |   7 +-
 23 files changed, 610 insertions(+), 494 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index b067a6ddafc8..1df0b58a7037 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -109,11 +110,14 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
     Files.createDirectories(Paths.get(partitionPath));
     storage = HoodieStorageUtils.getStorage(tablePath, storageConf());
 
-    try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(new StoragePath(partitionPath))
+    try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+        .withParentPath(new StoragePath(partitionPath))
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-        
.withFileId("test-log-fileid1").withInstantTime("100").withStorage(storage)
-        .withSizeThreshold(1).build()) {
+        .withLogFileId("test-log-fileid1")
+        .withInstantTime("100")
+        .withStorage(storage)
+        .withSizeThreshold(1L)
+        .build()) {
 
       // write data to file
       List<HoodieRecord> records = SchemaTestUtil.generateTestRecords(0, 
100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
@@ -203,16 +207,14 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
     partitionPath = tablePath + StoragePath.SEPARATOR + 
HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH;
     Files.createDirectories(Paths.get(partitionPath));
 
-    HoodieLogFormat.Writer writer = null;
-    try {
-      // set little threshold to split file.
-      writer =
-          HoodieLogFormat.newWriterBuilder().onParentPath(new 
StoragePath(partitionPath))
-              .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-              
.withFileId("test-log-fileid1").withInstantTime(INSTANT_TIME).withStorage(
-                  storage)
-              .withSizeThreshold(500).build();
-
+    try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+        .withParentPath(new StoragePath(partitionPath))
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withLogFileId("test-log-fileid1")
+        .withInstantTime(INSTANT_TIME)
+        .withStorage(storage)
+        .withSizeThreshold(500L) // set little threshold to split file.
+        .build()) {
       SchemaTestUtil testUtil = new SchemaTestUtil();
       List<HoodieRecord> records1 = testUtil.generateHoodieTestRecords(0, 
100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -220,10 +222,6 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
       HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
       writer.appendBlock(dataBlock);
-    } finally {
-      if (writer != null) {
-        writer.close();
-      }
     }
 
     Object result = shell.evaluate(() -> "show logfile records 
--logFilePathPattern "
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index d518ac5525dd..bab78324d7de 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -31,8 +31,8 @@ import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -116,9 +116,12 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
   private Writer openWriter(StoragePath archivePath) {
     try {
       if (this.writer == null) {
-        return 
HoodieLogFormat.newWriterBuilder().onParentPath(archivePath).withInstantTime("")
-            
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
-            .withStorage(metaClient.getStorage()).build();
+        return HoodieLogFormatWriter.builder()
+            .withParentPath(archivePath).withInstantTime("")
+            .withLogFileId(archiveFilePath.getName())
+            .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+            .withStorage(metaClient.getStorage())
+            .build();
       } else {
         return this.writer;
       }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 5ea8ba460f87..3b825bbf1ade 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -39,7 +39,7 @@ import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.AppendResult;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
@@ -105,7 +105,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   // Incoming records to be written to logs.
   protected Iterator<HoodieRecord<T>> recordItr;
   // Writer to log into the file group's latest slice.
-  protected Writer writer;
+  protected HoodieLogFormat.Writer writer;
 
   protected final List<WriteStatus> statuses;
   // Total number of records written during appending
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index b1d8e4305971..4eebea25a01e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.LogFileCreationCallback;
 import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -289,9 +290,9 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
   protected HoodieLogFormat.Writer createLogWriter(String instantTime, String 
fileSuffix, Option<FileSlice> fileSliceOpt) {
     try {
       if 
(config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
-        return HoodieLogFormat.newWriterBuilder()
-            
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
-            .withFileId(fileId)
+        return HoodieLogFormatWriter.builder()
+            
.withParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
+            .withLogFileId(fileId)
             .withInstantTime(instantTime)
             .withFileSize(0L)
             .withSizeThreshold(config.getLogFileMaxSize())
@@ -306,9 +307,9 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
         Option<HoodieLogFile> latestLogFile = fileSliceOpt.isPresent()
             ? fileSliceOpt.get().getLatestLogFile()
             : Option.empty();
-        return HoodieLogFormat.newWriterBuilder()
-            
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
-            .withFileId(fileId)
+        return HoodieLogFormatWriter.builder()
+            
.withParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
+            .withLogFileId(fileId)
             .withInstantTime(instantTime)
             
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
             
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ce5a4eeb0a2f..6748c8918a46 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -60,6 +60,7 @@ import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
@@ -1197,9 +1198,9 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
 
         final HoodieDeleteBlock block = new 
HoodieDeleteBlock(Collections.emptyList(), blockHeader);
 
-        try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
-            
.onParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), 
relativePartitionPath))
-            .withFileId(fileGroupFileId)
+        try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+            
.withParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(),
 relativePartitionPath))
+            .withLogFileId(fileGroupFileId)
             .withInstantTime(instantTime)
             .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
             .withFileSize(0L)
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
index 79a28421751e..1b288a70e8ba 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.LogFileCreationCallback;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -311,9 +312,9 @@ public class RollbackHelperV1 extends RollbackHelper {
           // Let's emit markers for rollback as well. markers are emitted 
under rollback instant time.
           WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime);
 
-          HoodieLogFormat.WriterBuilder writerBuilder = 
HoodieLogFormat.newWriterBuilder()
-              
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partitionPath))
-              .withFileId(fileId)
+          HoodieLogFormatWriter.HoodieLogFormatWriterBuilder writerBuilder = 
HoodieLogFormatWriter.builder()
+              
.withParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partitionPath))
+              .withLogFileId(fileId)
               
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
               
.withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
                   ? instantToRollback.requestedTime() : 
rollbackRequest.getLatestBaseInstant()
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
index 5317583bbe9c..ac6e7df56038 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.ActiveAction;
@@ -101,10 +102,13 @@ public class TestLegacyArchivedMetaEntryReader {
 
   private HoodieLogFormat.Writer openWriter(HoodieTableMetaClient metaClient) {
     try {
-      return HoodieLogFormat.newWriterBuilder()
-          .onParentPath(metaClient.getArchivePath())
-          
.withFileId("commits").withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
-          .withStorage(metaClient.getStorage()).withInstantTime("").build();
+      return HoodieLogFormatWriter.builder()
+          .withParentPath(metaClient.getArchivePath())
+          .withLogFileId("commits")
+          .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+          .withStorage(metaClient.getStorage())
+          .withInstantTime("")
+          .build();
     } catch (IOException e) {
       throw new HoodieException("Unable to initialize HoodieLogFormat writer", 
e);
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index ed9f1ef17645..78e158f78608 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
@@ -168,10 +169,13 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
   }
 
   private Pair<String, HoodieLogFile> appendRecordsToLogFile(String 
partitionPath, String fileId, List<HoodieRecord> records) throws Exception {
-    try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(new StoragePath(basePath, partitionPath))
-        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
-        .withInstantTime(currentInstantTime).withStorage(storage).build()) {
+    try (HoodieLogFormat.Writer logWriter = HoodieLogFormatWriter.builder()
+        .withParentPath(new StoragePath(basePath, partitionPath))
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withLogFileId(fileId)
+        .withInstantTime(currentInstantTime)
+        .withStorage(storage)
+        .build()) {
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
currentInstantTime);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index 09e24fbbb0fe..8d4395a50d7d 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.util.collection.Pair;
@@ -136,10 +137,13 @@ public class HoodieFlinkWriteableTestTable extends 
HoodieWriteableTestTable {
   private Pair<String, HoodieLogFile> 
appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
     String partitionPath = groupedRecords.get(0).getPartitionPath();
     HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
-    try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(new StoragePath(basePath, partitionPath))
-        
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
-        
.withInstantTime(location.getInstantTime()).withStorage(storage).build()) {
+    try (HoodieLogFormat.Writer logWriter = HoodieLogFormatWriter.builder()
+        .withParentPath(new StoragePath(basePath, partitionPath))
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withLogFileId(location.getFileId())
+        .withInstantTime(location.getInstantTime())
+        .withStorage(storage)
+        .build()) {
       Map<HeaderMetadataType, String> header = new java.util.HashMap<>();
       header.put(HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
       header.put(HeaderMetadataType.SCHEMA, schema.toString());
@@ -150,7 +154,7 @@ public class HoodieFlinkWriteableTestTable extends 
HoodieWriteableTestTable {
           HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), 
r.getPartitionPath(), "");
           return (IndexedRecord) val;
         } catch (IOException e) {
-          log.warn("Failed to convert record " + r.toString(), e);
+          log.warn("Failed to convert record {}", r, e);
           return null;
         }
       }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index 2c536ec07722..a675e0d9da89 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -24,13 +24,13 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -60,31 +60,140 @@ public interface HoodieLogFormat {
 
   String DEFAULT_WRITE_TOKEN = "0-0-0";
 
-  String DEFAULT_LOG_FORMAT_WRITER = 
"org.apache.hudi.common.table.log.HoodieLogFormatWriter";
-
   /**
-   * Writer interface to allow appending block to this file format.
+   * Abstract base class for appending blocks to the Hoodie log format.
+   * Subclasses provide specific implementations for writing to different 
storage layers.
    */
-  interface Writer extends Closeable {
+  @Getter
+  @Slf4j
+  abstract class Writer implements Closeable {
+
+    // Default max log file size 512 MB
+    public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
+
+    // Buffer size
+    protected Integer bufferSize;
+    // FileSystem
+    protected HoodieStorage storage;
+    // Size threshold for the log file. Useful when used with a rolling log 
appender
+    protected Long sizeThreshold;
+    // Log File extension. Could be .avro.delta or .avro.commits etc
+    protected String fileExtension;
+    // File Id
+    protected String logFileId;
+    // File Commit Time stamp
+    protected String instantTime;
+    // version number for this log file. If not specified, then the current 
version will be
+    // computed by inspecting the file system
+    protected Integer logVersion;
+    // file size of this log file
+    protected Long fileSize;
+    // Location of the directory containing the log
+    protected StoragePath parentPath;
+    // Log File Write Token
+    protected String logWriteToken;
+    // optional file suffix
+    protected String suffix;
+    // file creation hook
+    protected LogFileCreationCallback fileCreationCallback;
+    protected HoodieLogFile logFile;
+
+    protected HoodieTableVersion tableVersion;
 
     /**
-     * @return the path to the current {@link HoodieLogFile} being written to.
+     * Base constructor that performs the core Hudi Log logic.
      */
-    HoodieLogFile getLogFile();
+    protected Writer(
+        Integer bufferSize,
+        HoodieStorage storage,
+        StoragePath parentPath,
+        String logFileId,
+        String fileExtension,
+        String instantTime,
+        Integer logVersion,
+        String logWriteToken,
+        String suffix,
+        Long fileSize,
+        Long sizeThreshold,
+        LogFileCreationCallback fileCreationCallback,
+        HoodieTableVersion tableVersion) throws IOException {
+      log.info("Building HoodieLogFormat.Writer");
+
+      // Validation
+      ValidationUtils.checkArgument(storage != null, "Storage is not 
specified");
+      ValidationUtils.checkArgument(logFileId != null, "FileID is not 
specified");
+      ValidationUtils.checkArgument(instantTime != null, "Instant time is not 
specified");
+      ValidationUtils.checkArgument(fileExtension != null, "File extension is 
not specified");
+      ValidationUtils.checkArgument(parentPath != null, "Log file parent 
location is not specified");
+
+      this.bufferSize = bufferSize != null ? bufferSize : 
storage.getDefaultBufferSize();
+      this.storage = storage;
+      this.parentPath = parentPath;
+      this.logFileId = logFileId;
+      this.fileExtension = fileExtension;
+      this.instantTime = instantTime;
+      this.logVersion = logVersion;
+      this.logWriteToken = logWriteToken;
+      this.suffix = suffix;
+
+      // Defaults and logic
+      this.fileSize = fileSize != null ? fileSize : 0L;
+      this.sizeThreshold = sizeThreshold != null ? sizeThreshold : 
DEFAULT_SIZE_THRESHOLD;
+      // Does nothing by default
+      this.fileCreationCallback = fileCreationCallback != null ? 
fileCreationCallback : new LogFileCreationCallback() {};
+      this.tableVersion = tableVersion != null ? tableVersion : 
HoodieTableVersion.current();
+
+      // Log version computation
+      if (this.logVersion == null) {
+        log.info("Computing next log version for {} in {}", logFileId, 
parentPath);
+        boolean useBaseVersion = 
this.tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) && 
this.logWriteToken != null;
+
+        if (useBaseVersion) {
+          this.logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
+        } else {
+          // Compute from storage (expensive)
+          Option<Pair<Integer, String>> versionAndToken = 
FSUtils.getLatestLogVersion(this.storage, this.parentPath, this.logFileId, 
this.fileExtension, this.instantTime);
+          if (versionAndToken.isPresent()) {
+            this.logVersion = versionAndToken.get().getKey();
+            this.logWriteToken = versionAndToken.get().getValue();
+          } else {
+            this.logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
+            this.logWriteToken = UNKNOWN_WRITE_TOKEN;
+          }
+        }
+      }
+
+      if (this.logWriteToken == null) {
+        this.logWriteToken = UNKNOWN_WRITE_TOKEN;
+      }
+
+      if (this.suffix != null) {
+        // A little hacky to simplify the file name concatenation:
+        // patch the write token with an optional suffix
+        // instead of adding a new extension
+        this.logWriteToken = this.logWriteToken + this.suffix;
+      }
+
+      // Initialise logFile
+      StoragePath logPath = new StoragePath(parentPath,
+          FSUtils.makeLogFileName(this.logFileId, this.fileExtension, 
this.instantTime, this.logVersion, this.logWriteToken));
+      log.info("HoodieLogFile on path {}", logPath);
+      this.logFile = new HoodieLogFile(logPath, this.fileSize);
+    }
 
     /**
      * Append Block to a log file.
      * @return {@link AppendResult} containing result of the append.
      */
-    AppendResult appendBlock(HoodieLogBlock block) throws IOException, 
InterruptedException;
+    public abstract AppendResult appendBlock(HoodieLogBlock block) throws 
IOException, InterruptedException;
 
     /**
      * Appends the list of blocks to a logfile.
      * @return {@link AppendResult} containing result of the append.
      */
-    AppendResult appendBlocks(List<HoodieLogBlock> blocks) throws IOException, 
InterruptedException;
+    public abstract AppendResult appendBlocks(List<HoodieLogBlock> blocks) 
throws IOException, InterruptedException;
 
-    long getCurrentSize() throws IOException;
+    public abstract long getCurrentSize() throws IOException;
 
     /**
      * Force previously appended blocks to durable storage so that downstream
@@ -95,7 +204,7 @@ public interface HoodieLogFormat {
      * mainly for tests that assert per-append visibility on the underlying
      * file system.
      */
-    void sync() throws IOException;
+    public abstract void sync() throws IOException;
   }
 
   /**
@@ -110,204 +219,20 @@ public interface HoodieLogFormat {
 
     /**
      * Read log file in reverse order and check if prev block is present.
-     * 
+     *
      * @return {@code true} if previous block is present, {@code false} 
otherwise.
      */
     boolean hasPrev();
 
     /**
      * Read log file in reverse order and return prev block if present.
-     * 
+     *
      * @return {@link HoodieLogBlock} the previous block
      * @throws IOException
      */
     HoodieLogBlock prev() throws IOException;
   }
 
-  /**
-   * Builder class to construct the default log format writer.
-   */
-  class WriterBuilder {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(WriterBuilder.class);
-    // Default max log file size 512 MB
-    public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
-
-    // Buffer size
-    private Integer bufferSize;
-    // FileSystem
-    private HoodieStorage storage;
-    // Size threshold for the log file. Useful when used with a rolling log 
appender
-    private Long sizeThreshold;
-    // Log File extension. Could be .avro.delta or .avro.commits etc
-    private String fileExtension;
-    // File Id
-    private String logFileId;
-    // File Commit Time stamp
-    private String instantTime;
-    // version number for this log file. If not specified, then the current 
version will be
-    // computed by inspecting the file system
-    private Integer logVersion;
-    // file len of this log file
-    private Long fileLen = 0L;
-    // Location of the directory containing the log
-    private StoragePath parentPath;
-    // Log File Write Token
-    private String logWriteToken;
-    // optional file suffix
-    private String suffix;
-    // file creation hook
-    private LogFileCreationCallback fileCreationCallback;
-
-    private HoodieTableVersion tableVersion;
-
-    public WriterBuilder withBufferSize(int bufferSize) {
-      this.bufferSize = bufferSize;
-      return this;
-    }
-
-    public WriterBuilder withLogWriteToken(String logWriteToken) {
-      this.logWriteToken = logWriteToken;
-      return this;
-    }
-
-    public WriterBuilder withSuffix(String suffix) {
-      this.suffix = suffix;
-      return this;
-    }
-
-    public WriterBuilder withStorage(HoodieStorage storage) {
-      this.storage = storage;
-      return this;
-    }
-
-    public WriterBuilder withSizeThreshold(long sizeThreshold) {
-      this.sizeThreshold = sizeThreshold;
-      return this;
-    }
-
-    public WriterBuilder withFileExtension(String logFileExtension) {
-      this.fileExtension = logFileExtension;
-      return this;
-    }
-
-    public WriterBuilder withFileId(String fileId) {
-      this.logFileId = fileId;
-      return this;
-    }
-
-    public WriterBuilder withInstantTime(String instantTime) {
-      this.instantTime = instantTime;
-      return this;
-    }
-
-    public WriterBuilder withLogVersion(int version) {
-      this.logVersion = version;
-      return this;
-    }
-
-    public WriterBuilder withFileSize(long fileLen) {
-      this.fileLen = fileLen;
-      return this;
-    }
-
-    public WriterBuilder onParentPath(StoragePath parentPath) {
-      this.parentPath = parentPath;
-      return this;
-    }
-
-    public WriterBuilder withFileCreationCallback(LogFileCreationCallback 
fileCreationCallback) {
-      this.fileCreationCallback = fileCreationCallback;
-      return this;
-    }
-
-    public WriterBuilder withTableVersion(HoodieTableVersion 
writeTableVersion) {
-      this.tableVersion = writeTableVersion;
-      return this;
-    }
-
-    public Writer build() throws IOException {
-      LOG.info("Building HoodieLogFormat Writer");
-      if (storage == null) {
-        throw new IllegalArgumentException("fs is not specified");
-      }
-      if (logFileId == null) {
-        throw new IllegalArgumentException("FileID is not specified");
-      }
-      if (instantTime == null) {
-        throw new IllegalArgumentException("Instant time is not specified");
-      }
-      if (fileExtension == null) {
-        throw new IllegalArgumentException("File extension is not specified");
-      }
-      if (parentPath == null) {
-        throw new IllegalArgumentException("Log file parent location is not 
specified");
-      }
-
-      if (fileCreationCallback == null) {
-        // by default does nothing.
-        fileCreationCallback = new LogFileCreationCallback() {};
-      }
-
-      if (tableVersion == null) {
-        tableVersion = HoodieTableVersion.current();
-      }
-
-      if (logVersion == null) {
-        LOG.info("Computing the next log version for {} in {}", logFileId, 
parentPath);
-        boolean useBaseVersion = 
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
-            &&  logWriteToken != null;
-        if (useBaseVersion) {
-          // the log format writer handles the existence check.
-          logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
-        } else {
-          // compute from storage (expensive)
-          Option<Pair<Integer, String>> versionAndWriteToken =
-              FSUtils.getLatestLogVersion(storage, parentPath, logFileId, 
fileExtension, instantTime);
-          if (versionAndWriteToken.isPresent()) {
-            logVersion = versionAndWriteToken.get().getKey();
-            logWriteToken = versionAndWriteToken.get().getValue();
-          } else {
-            // this is the case where there is no existing log-file.
-            logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
-            logWriteToken = UNKNOWN_WRITE_TOKEN;
-          }
-        }
-        LOG.info("Computed the next log version for {} in {} as {} with 
write-token {}", logFileId, parentPath, logVersion, logWriteToken);
-      }
-
-      if (logWriteToken == null) {
-        fileLen = 0L;
-        logWriteToken = UNKNOWN_WRITE_TOKEN;
-      }
-
-      if (suffix != null) {
-        // A little hacky to simplify the file name concatenation:
-        // patch the write token with an optional suffix
-        // instead of adding a new extension
-        logWriteToken = logWriteToken + suffix;
-      }
-
-      StoragePath logPath = new StoragePath(parentPath,
-          FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, 
logVersion, logWriteToken));
-      LOG.info("HoodieLogFile on path {}", logPath);
-      HoodieLogFile logFile = new HoodieLogFile(logPath, fileLen);
-
-      if (sizeThreshold == null) {
-        sizeThreshold = DEFAULT_SIZE_THRESHOLD;
-      }
-      return (Writer) ReflectionUtils.loadClass(
-          DEFAULT_LOG_FORMAT_WRITER,
-          new Class[] {HoodieStorage.class, HoodieLogFile.class, 
Integer.class, Short.class, Long.class, String.class, 
LogFileCreationCallback.class},
-          storage, logFile, bufferSize, null, sizeThreshold, logWriteToken, 
fileCreationCallback
-      );
-    }
-  }
-
-  static WriterBuilder newWriterBuilder() {
-    return new WriterBuilder();
-  }
-
   static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile 
logFile, HoodieSchema readerSchema)
       throws IOException {
     return new HoodieLogFileReader(storage, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE);
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 1d6acc8d85ef..2aed1d7dd87a 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -19,13 +19,14 @@
 
 package org.apache.hudi.common.table.log;
 
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.log.HoodieLogFormat.WriterBuilder;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
 
+import lombok.Builder;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,38 +43,36 @@ import java.util.List;
 /**
  * HoodieLogFormatWriter can be used to append blocks to a log file Use 
HoodieLogFormat.WriterBuilder to construct.
  */
+@Getter
 @Slf4j
-public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
+public class HoodieLogFormatWriter extends HoodieLogFormat.Writer {
 
-  @Getter
-  private HoodieLogFile logFile;
-  private FSDataOutputStream output;
-
-  private final HoodieStorage storage;
-  @Getter
-  private final long sizeThreshold;
-  private final Integer bufferSize;
   private final Short replication;
-  private final String rolloverLogWriteToken;
-  private final LogFileCreationCallback fileCreationHook;
+  private FSDataOutputStream outputStream;
   private boolean closed = false;
   private transient Thread shutdownThread = null;
 
-  public HoodieLogFormatWriter(
-      HoodieStorage storage,
-      HoodieLogFile logFile,
+  @Builder(setterPrefix = "with")
+  private HoodieLogFormatWriter(
       Integer bufferSize,
-      Short replication,
+      HoodieStorage storage,
+      StoragePath parentPath,
+      String logFileId,
+      String fileExtension,
+      String instantTime,
+      Integer logVersion,
+      String logWriteToken,
+      String suffix,
+      Long fileSize,
       Long sizeThreshold,
-      String rolloverLogWriteToken,
-      LogFileCreationCallback fileCreationHook) {
-    this.storage = storage;
-    this.logFile = logFile;
-    this.sizeThreshold = sizeThreshold;
-    this.bufferSize = bufferSize != null ? bufferSize : 
storage.getDefaultBufferSize();
+      LogFileCreationCallback fileCreationCallback,
+      HoodieTableVersion tableVersion,
+      Short replication
+  ) throws IOException {
+    super(bufferSize, storage, parentPath, logFileId, fileExtension, 
instantTime, logVersion, logWriteToken,
+        suffix, fileSize, sizeThreshold, fileCreationCallback, tableVersion);
+    // outputStream is not initialized here, it will be lazily initialized in 
getOutputStream()
     this.replication = replication != null ? replication : 
storage.getDefaultReplication(logFile.getPath().getParent());
-    this.rolloverLogWriteToken = rolloverLogWriteToken;
-    this.fileCreationHook = fileCreationHook;
     addShutDownHook();
   }
 
@@ -81,8 +80,8 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
    * Overrides the output stream, only for test purpose.
    */
   @VisibleForTesting
-  public void withOutputStream(FSDataOutputStream output) {
-    this.output = output;
+  public void withOutputStream(FSDataOutputStream outputStream) {
+    this.outputStream = outputStream;
   }
 
   /**
@@ -91,7 +90,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
    * @throws IOException
    */
   private FSDataOutputStream getOutputStream() throws IOException {
-    if (this.output == null) {
+    if (outputStream == null) {
       boolean created = false;
       while (!created) {
         try {
@@ -116,7 +115,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
         }
       }
     }
-    return output;
+    return outputStream;
   }
 
   @Override
@@ -207,23 +206,32 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
 
   private void rolloverIfNeeded() throws IOException {
     // Roll over if the size is past the threshold
-    if (getCurrentSize() > sizeThreshold) {
-      log.info("CurrentSize {} has reached threshold {}. Rolling over to the 
next version", getCurrentSize(), sizeThreshold);
+    if (getCurrentSize() > getSizeThreshold()) {
+      log.info("CurrentSize {} has reached threshold {}. Rolling over to the 
next version", getCurrentSize(), getSizeThreshold());
       rollOver();
     }
   }
 
   private void rollOver() throws IOException {
     closeStream();
-    this.logFile = logFile.rollOver(rolloverLogWriteToken);
+    this.logFile = getLogFile().rollOver(getLogWriteToken());
     this.closed = false;
   }
 
   private void createNewFile() throws IOException {
-    fileCreationHook.preFileCreation(this.logFile);
-    this.output = new FSDataOutputStream(
-        storage.create(this.logFile.getPath(), false, bufferSize, replication, 
WriterBuilder.DEFAULT_SIZE_THRESHOLD),
-        new FileSystem.Statistics(storage.getScheme())
+    getFileCreationCallback().preFileCreation(this.getLogFile());
+    this.outputStream = new FSDataOutputStream(
+        getStorage().create(
+            this.getLogFile().getPath(),
+            false,
+            getBufferSize(),
+            getReplication(),
+            // HDFS block size is intentionally a fixed constant, independent 
of the
+            // log rollover threshold (getSizeThreshold()). A small rollover 
threshold
+            // must not shrink the underlying file's block size below HDFS 
limits.
+            DEFAULT_SIZE_THRESHOLD
+        ),
+        new FileSystem.Statistics(getStorage().getScheme())
     );
   }
 
@@ -237,25 +245,25 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   }
 
   private void closeStream() throws IOException {
-    if (output != null) {
+    if (outputStream != null) {
       // Persist all buffered data to DataNodes before closing so downstream
       // readers can observe a fully-written log file at commit-level 
visibility.
       sync();
-      output.close();
-      output = null;
+      outputStream.close();
+      outputStream = null;
       closed = true;
     }
   }
 
   @Override
   public void sync() throws IOException {
-    if (output == null) {
+    if (outputStream == null) {
       return; // Presume closed
     }
-    output.flush();
+    outputStream.flush();
     // NOTE: the following API call makes sure that the data is flushed to 
disk on DataNodes (akin to POSIX fsync())
     // See more details here: https://issues.apache.org/jira/browse/HDFS-744
-    output.hsync();
+    outputStream.hsync();
   }
 
   @Override
@@ -264,27 +272,25 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
       throw new IllegalStateException("Cannot get current size as the 
underlying stream has been closed already");
     }
 
-    if (output == null) {
+    if (outputStream == null) {
       return 0;
     }
-    return output.getPos();
+    return outputStream.getPos();
   }
 
   /**
    * Close the output stream when the JVM exits.
    */
   private void addShutDownHook() {
-    shutdownThread = new Thread() {
-      public void run() {
-        try {
-          log.info("running HoodieLogFormatWriter shutdown hook to close 
output stream for log file: {}", logFile);
-          closeStream();
-        } catch (Exception e) {
-          log.warn("unable to close output stream for log file: {}", logFile, 
e);
-          // fail silently for any sort of exception
-        }
+    shutdownThread = new Thread(() -> {
+      try {
+        log.info("Running HoodieLogFormatWriter shutdown hook to close output 
stream for log file: {}", logFile);
+        closeStream();
+      } catch (Exception e) {
+        log.warn("Unable to close output stream for log file: {}", logFile, e);
+        // fail silently for any sort of exception
       }
-    };
+    });
     Runtime.getRuntime().addShutdownHook(shutdownThread);
   }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 2a3b2ced35c8..e63a27149d3c 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -44,7 +44,6 @@ import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.table.log.HoodieLogFileReader;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
 import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.TestLogReaderUtils;
@@ -207,10 +206,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
   @Test
   public void testEmptyLog() throws IOException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     assertEquals(0, writer.getCurrentSize(), "Just created this log, size 
should be 0");
     assertTrue(writer.getLogFile().getFileName().startsWith("."), "Check all 
log files should start with a .");
     assertEquals(1, writer.getLogFile().getLogVersion(), "Version should be 1 
for new log created");
@@ -220,15 +223,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", 
"PARQUET_DATA_BLOCK"})
   public void testBasicAppend(HoodieLogBlockType dataBlockType) throws 
IOException, InterruptedException, URISyntaxException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-    long pos = writer.getCurrentSize();
     HoodieDataBlock dataBlock = getDataBlock(dataBlockType, records, header);
     AppendResult result = writer.appendBlock(dataBlock);
 
@@ -245,10 +251,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
   @Test
   public void testRollover() throws IOException, InterruptedException, 
URISyntaxException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -265,10 +275,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     // Create a writer with the size threshold as the size we just wrote - so 
this has to roll
     writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage)
-            .withSizeThreshold(size - 1).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .withSizeThreshold(size - 1)
+            .build();
     records = SchemaTestUtil.generateTestRecords(0, 100);
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
     AppendResult secondAppend = writer.appendBlock(dataBlock);
@@ -306,14 +320,19 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   }
 
   private void testConcurrentAppend(boolean logFileExists, boolean 
newLogFileFormat) throws Exception {
-    HoodieLogFormat.WriterBuilder builder1 =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-            
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
-            .withInstantTime("100").withStorage(storage);
-    HoodieLogFormat.WriterBuilder builder2 =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-            
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
-            .withInstantTime("100").withStorage(storage);
+    HoodieLogFormatWriter.HoodieLogFormatWriterBuilder builder1 = 
HoodieLogFormatWriter.builder()
+        .withParentPath(partitionPath)
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withLogFileId("test-fileid1")
+        .withInstantTime("100")
+        .withStorage(storage);
+
+    HoodieLogFormatWriter.HoodieLogFormatWriterBuilder builder2 = 
HoodieLogFormatWriter.builder()
+        .withParentPath(partitionPath)
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withLogFileId("test-fileid1")
+        .withInstantTime("100")
+        .withStorage(storage);
 
     if (newLogFileFormat && logFileExists) {
       // Assume there is an existing log-file with write token
@@ -330,14 +349,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     } else {
       builder1 = 
builder1.withLogVersion(1).withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
     }
-    Writer writer = builder1.build();
+    HoodieLogFormat.Writer writer = builder1.build();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
     HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, 
header);
     writer.appendBlock(dataBlock);
-    Writer writer2 = builder2.build();
+    HoodieLogFormat.Writer writer2 = builder2.build();
     writer2.appendBlock(dataBlock);
     HoodieLogFile logFile1 = writer.getLogFile();
     HoodieLogFile logFile2 = writer2.getLogFile();
@@ -350,11 +369,15 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", 
"PARQUET_DATA_BLOCK"})
   public void testMultipleAppend(HoodieLogBlockType dataBlockType) throws 
IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withLogVersion(1).withInstantTime("100")
-            .withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withLogVersion(1)
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -365,10 +388,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.close();
 
     writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withLogVersion(1).withInstantTime("100")
-            .withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withLogVersion(1)
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     ((HoodieLogFormatWriter) writer).withOutputStream((FSDataOutputStream)
         storage.append(writer.getLogFile().getPath()));
     records = SchemaTestUtil.generateTestRecords(0, 100);
@@ -384,10 +411,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     // Close and Open again and append 100 more records
     writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withLogVersion(1).withInstantTime("100")
-            .withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withLogVersion(1)
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     ((HoodieLogFormatWriter) writer).withOutputStream(
         (FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
     records = SchemaTestUtil.generateTestRecords(0, 100);
@@ -402,7 +433,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.close();
 
     // Cannot get the current size after closing the log
-    final Writer closedWriter = writer;
+    final HoodieLogFormat.Writer closedWriter = writer;
     assertThrows(IllegalStateException.class, closedWriter::getCurrentSize, 
"getCurrentSize should fail after the logAppender is closed");
   }
 
@@ -424,8 +455,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, 
header);
 
     for (int i = 0; i < 2; i++) {
-      Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
-          
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
+      HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+          .withParentPath(testPath)
+          .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+          .withLogFileId("commits")
           .withInstantTime("")
           .withStorage(localStorage).build();
       writer.appendBlock(dataBlock);
@@ -440,11 +473,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @ValueSource(ints = {6, 8})
   public void testBasicWriteAndScan(int tableVersion) throws IOException, 
URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withTableVersion(HoodieTableVersion.fromVersionCode(tableVersion))
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            
.withLogFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
     HoodieSchema schema = getSimpleSchema();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords = records.stream()
@@ -475,10 +509,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
   @Test
   public void testHugeLogFileWrite() throws IOException, URISyntaxException, 
InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage)
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
             .withSizeThreshold(3L * 1024 * 1024 * 1024)
             .build();
     HoodieSchema schema = getSimpleSchema();
@@ -525,10 +561,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", 
"PARQUET_DATA_BLOCK"})
   public void testBasicAppendAndRead(HoodieLogBlockType dataBlockType) throws 
IOException, URISyntaxException, InterruptedException {
-    Writer writer = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+        .withParentPath(partitionPath)
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-        .withFileId("test-fileid1")
+        .withLogFileId("test-fileid1")
         .withInstantTime("100")
         .withStorage(storage)
         .build();
@@ -543,10 +579,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(dataBlock);
     writer.close();
 
-    writer = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(partitionPath)
+    writer = HoodieLogFormatWriter.builder()
+        .withParentPath(partitionPath)
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-        .withFileId("test-fileid1")
+        .withLogFileId("test-fileid1")
         .withInstantTime("100")
         .withStorage(storage)
         .build();
@@ -562,10 +598,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.close();
 
     // Close and Open again and append 100 more records
-    writer = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(partitionPath)
+    writer = HoodieLogFormatWriter.builder()
+        .withParentPath(partitionPath)
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-        .withFileId("test-fileid1")
+        .withLogFileId("test-fileid1")
         .withInstantTime("100")
         .withStorage(storage)
         .build();
@@ -614,10 +650,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
   @Test
   public void testCDCBlock() throws IOException, InterruptedException {
-    Writer writer = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+        .withParentPath(partitionPath)
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-        .withFileId("test-fileid1")
+        .withLogFileId("test-fileid1")
         .withInstantTime("100")
         .withStorage(storage)
         .build();
@@ -1062,10 +1098,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   }
 
   private HoodieLogFile addValidBlock(String fileId, String commitTime, int 
numRecords) throws IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId(fileId).withInstantTime(commitTime).withStorage(storage).build();
+            .withLogFileId(fileId)
+            .withInstantTime(commitTime)
+            .withStorage(storage)
+            .build();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 
numRecords);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -1079,10 +1118,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   private HoodieLogFile appendValidBlock(StoragePath path, String fileId, 
String commitTime,
                                          int numRecords)
       throws IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId(fileId).withInstantTime(commitTime).withStorage(storage).build();
+            .withLogFileId(fileId)
+            .withInstantTime(commitTime)
+            .withStorage(storage)
+            .build();
     ((HoodieLogFormatWriter) writer).withOutputStream(
         (FSDataOutputStream) storage.append(path));
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 
numRecords);
@@ -1097,10 +1139,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
   @Test
   public void testValidateCorruptBlockEndPosition() throws IOException, 
URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -1153,11 +1198,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage)
-            .withSizeThreshold(500).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .withSizeThreshold(500L)
+            .build();
     SchemaTestUtil testUtil = new SchemaTestUtil();
 
     // Write 1
@@ -1197,10 +1245,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1260,10 +1311,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1298,9 +1352,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     outputStream.close();
 
     writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     // Write 3
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
     List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
@@ -1330,10 +1387,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1471,10 +1531,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     String fileId = "test-fileid111";
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId(fileId).withInstantTime("100").withStorage(storage).build();
+            .withLogFileId(fileId)
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1 -> 100 records are written
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1579,10 +1642,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1712,10 +1778,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Write a Data block and Delete block with same InstantTime (written in 
same batch)
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1781,10 +1850,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Write a Data block and Delete block with same InstantTime (written in 
same batch)
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1833,10 +1905,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1870,10 +1945,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Write a 3 Data blocs with same InstantTime (written in same batch)
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1924,10 +2002,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     List<String> deleteKeyListInV2Block = Arrays.asList(
         "d448e1b8-a0d4-45c0-bf2d-a9e16ff3c8ce",
         "df3f71cd-5b68-406c-bb70-861179444adb",
@@ -2047,10 +2128,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -2107,10 +2191,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Write a 3 Data blocs with same InstantTime (written in same batch)
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -2152,9 +2239,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     outputStream.close();
 
     writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     writer.appendBlock(dataBlock);
     writer.close();
@@ -2172,9 +2263,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     outputStream.close();
 
     writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     // Write 1 rollback block for the last commit instant
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
     header.put(HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -2202,10 +2296,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Write a 3 Data blocks with same InstantTime (written in same batch)
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
 
     // Write 1st data blocks multiple times.
     SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -2275,9 +2372,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     outputStream.close();
 
     writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     ((HoodieLogFormatWriter) writer).withOutputStream(
         (FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
 
@@ -2405,9 +2505,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       List<IndexedRecord> records2 = new ArrayList<>(records);
 
       // Write1 with numRecordsInLog1 records written to log.1
-      Writer writer = 
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-          
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
-          .withInstantTime("100").withStorage(storage).build();
+      HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+          .withParentPath(partitionPath)
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+          .withLogFileId("test-fileid1")
+          .withInstantTime("100")
+          .withStorage(storage)
+          .build();
 
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -2419,9 +2523,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       writer.close();
 
       // write2 with numRecordsInLog2 records written to log.2
-      Writer writer2 = 
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-          
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
-          .withInstantTime("100").withStorage(storage).withSizeThreshold(size 
- 1).build();
+      HoodieLogFormat.Writer writer2 = HoodieLogFormatWriter.builder()
+          .withParentPath(partitionPath)
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+          .withLogFileId("test-fileid1")
+          .withInstantTime("100")
+          .withStorage(storage)
+          .withSizeThreshold(size - 1)
+          .build();
 
       Map<HoodieLogBlock.HeaderMetadataType, String> header2 = new HashMap<>();
       header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -2498,10 +2607,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @Test
   public void testBasicAppendAndReadInReverse()
       throws IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     HoodieSchema schema = getSimpleSchema();
     List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
@@ -2568,10 +2681,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @Test
   public void testAppendAndReadOnCorruptedLogInReverse()
       throws IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     HoodieSchema schema = getSimpleSchema();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -2602,9 +2718,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     // Should be able to append a new block
     writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     ((HoodieLogFormatWriter) writer).withOutputStream(
         (FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
     records = SchemaTestUtil.generateTestRecords(0, 100);
@@ -2630,10 +2749,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @Test
   public void testBasicAppendAndTraverseInReverse()
       throws IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder().withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     HoodieSchema schema = getSimpleSchema();
     List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
@@ -2717,10 +2839,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   public void testDataBlockFormatAppendAndReadWithProjectedSchema(
       HoodieLogBlockType dataBlockType
   ) throws IOException, URISyntaxException, InterruptedException {
-    Writer writer = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+        .withParentPath(partitionPath)
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-        .withFileId("test-fileid1")
+        .withLogFileId("test-fileid1")
         .withInstantTime("100")
         .withStorage(storage)
         .build();
@@ -2862,10 +2984,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
   private HoodieLogFormat.Reader createCorruptedFile(String fileId) throws 
Exception {
     // block is corrupted, but check is skipped.
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
             .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId(fileId).withInstantTime("100").withStorage(storage).build();
+            .withLogFileId(fileId)
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index 7596c17d5209..125527541b65 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -21,8 +21,8 @@ package org.apache.hudi.common.functional;
 import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -120,9 +120,13 @@ public class TestHoodieLogFormatAppendFailure {
     HoodieAvroDataBlock dataBlock =
         new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
 
-    Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
-        
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
-        .withInstantTime("").withStorage(storage).build();
+    Writer writer = HoodieLogFormatWriter.builder()
+        .withParentPath(testPath)
+        .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+        .withLogFileId("commits")
+        .withInstantTime("")
+        .withStorage(storage)
+        .build();
 
     writer.appendBlock(dataBlock);
     // get the current log file version to compare later
@@ -152,9 +156,13 @@ public class TestHoodieLogFormatAppendFailure {
 
     // Opening a new Writer right now will throw IOException. The code should 
handle this, rollover the logfile and
     // return a new writer with a bumped up logVersion
-    writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
-        
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
-        .withInstantTime("").withStorage(storage).build();
+    writer = HoodieLogFormatWriter.builder()
+        .withParentPath(testPath)
+        .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+        .withLogFileId("commits")
+        .withInstantTime("")
+        .withStorage(storage)
+        .build();
     header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
         
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 193bb0173dfd..f2c6d2b49b20 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -195,8 +196,13 @@ class TestTableSchemaResolver {
   private StoragePath writeLogFile(StoragePath partitionPath, Schema schema) 
throws IOException, URISyntaxException, InterruptedException {
     HoodieStorage storage = HoodieTestUtils.getStorage(partitionPath);
     HoodieLogFormat.Writer writer =
-        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+        HoodieLogFormatWriter.builder()
+            .withParentPath(partitionPath)
+            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withLogFileId("test-fileid1")
+            .withInstantTime("100")
+            .withStorage(storage)
+            .build();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogWriterBuilder.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogFormatWriterBuilder.java
similarity index 87%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogWriterBuilder.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogFormatWriterBuilder.java
index e362a033cfe4..c7ed090fb85f 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogWriterBuilder.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogFormatWriterBuilder.java
@@ -23,6 +23,7 @@ package org.apache.hudi.common.table.log.block;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
@@ -38,21 +39,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 /**
- * Test class for {@link HoodieLogFormat#newWriterBuilder()}.
+ * Test class for {@link HoodieLogFormatWriter#builder()}.
  */
-public class TestHoodieLogWriterBuilder {
+public class TestHoodieLogFormatWriterBuilder {
 
-  HoodieLogFormat.WriterBuilder builder;
+  HoodieLogFormatWriter.HoodieLogFormatWriterBuilder builder;
   HoodieLogFormat.Writer writer;
   HoodieStorage storage;
 
   @BeforeEach
   public void setup() {
     storage = mock(HoodieStorage.class);
-    builder = HoodieLogFormat.newWriterBuilder()
-        .withFileId("test-fileid1")
+    builder = HoodieLogFormatWriter.builder()
+        .withLogFileId("test-fileid1")
         .withInstantTime("100")
-        .onParentPath(new StoragePath("/tmp"))
+        .withParentPath(new StoragePath("/tmp"))
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
         .withStorage(storage);
   }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
index 3142338119a9..d576087bb3f1 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
@@ -835,16 +836,23 @@ public class TestArchivedTimelineV1 extends 
HoodieCommonTestHarness {
   }
 
   private Writer buildWriter(StoragePath archiveFilePath) throws IOException {
-    return 
HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
-        
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
-        .withStorage(metaClient.getStorage()).withInstantTime("").build();
+    return HoodieLogFormatWriter.builder()
+        .withParentPath(archiveFilePath.getParent())
+        .withLogFileId(archiveFilePath.getName())
+        .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+        .withStorage(metaClient.getStorage())
+        .withInstantTime("")
+        .build();
   }
 
   private Writer buildWriter(StoragePath archiveFilePath, int logVersion) 
throws IOException {
-    return 
HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
-        
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+    return 
HoodieLogFormatWriter.builder().withParentPath(archiveFilePath.getParent())
+        .withLogFileId(archiveFilePath.getName())
+        .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
         .withLogVersion(logVersion)
-        .withStorage(metaClient.getStorage()).withInstantTime("").build();
+        .withStorage(metaClient.getStorage())
+        .withInstantTime("")
+        .build();
   }
 
   private void writeArchiveLog(Writer writer, List<IndexedRecord> records) 
throws Exception {
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index a3602b3ff9e8..ed6a6be4105f 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -358,12 +358,14 @@ public class HoodieCommonTestHarness {
                                                      String commitTime,
                                                      String 
logBlockInstantTime)
       throws IOException, InterruptedException {
-    HoodieLogFormat.Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            .withSizeThreshold(1024).withFileId(fileId)
-            .withInstantTime(commitTime)
-            .withStorage(storage).build();
+    HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+        .withParentPath(partitionPath)
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withSizeThreshold(1024L)
+        .withLogFileId(fileId)
+        .withInstantTime(commitTime)
+        .withStorage(storage)
+        .build();
     if (storage.exists(writer.getLogFile().getPath())) {
       // enable append for reader test.
       ((HoodieLogFormatWriter) writer).withOutputStream(
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
similarity index 98%
rename from 
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 71e3fc670681..d0f5bde6f41e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -288,10 +289,10 @@ public class HoodieFileSliceTestUtils {
       Map<String, Long> keyToPositionMap
   ) throws InterruptedException, IOException {
     try (HoodieLogFormat.Writer writer =
-             HoodieLogFormat.newWriterBuilder()
-                 .onParentPath(new StoragePath(logFilePath).getParent())
+             HoodieLogFormatWriter.builder()
+                 .withParentPath(new StoragePath(logFilePath).getParent())
                  .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-                 .withFileId(fileId)
+                 .withLogFileId(fileId)
                  .withInstantTime(logInstantTime)
                  .withLogVersion(version)
                  .withStorage(storage).build()) {
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 274b2e21ac2b..06f8113b5ee6 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -371,10 +372,14 @@ public class InputFormatTestUtil {
                                                      int logVersion)
       throws InterruptedException, IOException {
     HoodieLogFormat.Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(new 
StoragePath(partitionDir.getPath()))
-            .withFileId(fileId)
-            
.withInstantTime(baseCommit).withStorage(storage).withLogVersion(logVersion)
-            .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+        HoodieLogFormatWriter.builder()
+            .withParentPath(new StoragePath(partitionDir.getPath()))
+            .withLogFileId(fileId)
+            .withInstantTime(baseCommit)
+            .withStorage(storage)
+            .withLogVersion(logVersion)
+            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .build();
     // generate metadata
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
@@ -406,8 +411,10 @@ public class InputFormatTestUtil {
                                                                
HoodieLogBlock.HoodieLogBlockType logBlockType)
       throws InterruptedException, IOException {
     HoodieLogFormat.Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(new 
StoragePath(partitionDir.getPath()))
-            
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+        HoodieLogFormatWriter.builder()
+            .withParentPath(new StoragePath(partitionDir.getPath()))
+            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withLogFileId(fileId)
             .withLogVersion(logVersion)
             .withInstantTime(newCommit)
             .withStorage(storage)
@@ -444,8 +451,10 @@ public class InputFormatTestUtil {
                                                                    String 
oldCommit, int logVersion)
       throws InterruptedException, IOException {
     HoodieLogFormat.Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(new 
StoragePath(partitionDir.getPath()))
-            
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+        HoodieLogFormatWriter.builder()
+            .withParentPath(new StoragePath(partitionDir.getPath()))
+            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withLogFileId(fileId)
             .withInstantTime(baseCommit)
             .withLogVersion(logVersion).withStorage(storage).build();
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index d1f21ddb3c4d..fcfbb2eb9832 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.LogFileCreationCallback;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -396,11 +397,11 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
 
       final WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(),
           HoodieSparkTable.create(config, context()), newCommitTime);
-      HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
-          .onParentPath(
+      HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormatWriter.builder()
+          .withParentPath(
               FSUtils.constructAbsolutePath(config.getBasePath(),
                   correctWriteStat.getPartitionPath()))
-          .withFileId(correctWriteStat.getFileId())
+          .withLogFileId(correctWriteStat.getFileId())
           .withInstantTime(newCommitTime)
           .withLogVersion(correctLogFile.getLogVersion())
           .withFileSize(0L)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineTableProcedure.scala
index c1e84cca83f1..c21b0ec8713b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineTableProcedure.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.engine.{HoodieEngineContext, 
HoodieLocalEngineCont
 import org.apache.hudi.common.engine.LocalTaskContextSupplier
 import org.apache.hudi.common.model.{ActionType, HoodieArchivedLogFile, 
HoodieAvroIndexedRecord, HoodieCommitMetadata, HoodieLogFile, HoodieRecord, 
WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
-import org.apache.hudi.common.table.log.HoodieLogFormat
+import org.apache.hudi.common.table.log.{HoodieLogFormat, 
HoodieLogFormatWriter}
 import org.apache.hudi.common.table.log.block.{HoodieAvroDataBlock, 
HoodieLogBlock}
 import org.apache.hudi.common.table.timeline.{ActiveAction, HoodieInstant, 
HoodieTimeline}
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
@@ -398,9 +398,9 @@ class TestShowTimelineTableProcedure extends 
HoodieSparkSqlTestBase {
       storage.createDirectory(archivePath)
     }
 
-    val writer = HoodieLogFormat.newWriterBuilder()
-      .onParentPath(archiveFilePath.getParent())
-      .withFileId(archiveFilePath.getName())
+    val writer = HoodieLogFormatWriter.builder()
+      .withParentPath(archiveFilePath.getParent())
+      .withLogFileId(archiveFilePath.getName())
       .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
       .withStorage(storage)
       .withInstantTime("")
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 24e76b2d9f69..0e603857f26d 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -39,8 +39,8 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -725,8 +725,10 @@ public class HiveTestUtil {
     HoodieSchema schema = getTestDataSchema(isLogSchemaSimple);
     HoodieBaseFile dataFile = new 
HoodieBaseFile(storage.getPathInfo(parquetFilePath));
     // Write a log file for this parquet file
-    Writer logWriter = 
HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
-        
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(dataFile.getFileId())
+    Writer logWriter = HoodieLogFormatWriter.builder()
+        .withParentPath(parquetFilePath.getParent())
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withLogFileId(dataFile.getFileId())
         
.withInstantTime(dataFile.getCommitTime()).withStorage(storage).build();
     List<HoodieRecord> records = (isLogSchemaSimple ? 
SchemaTestUtil.generateTestRecords(0, 100)
         : SchemaTestUtil.generateEvolvedTestRecords(100, 100)).stream()
@@ -745,9 +747,13 @@ public class HiveTestUtil {
     HoodieSchema schema = SchemaTestUtil.getSchema(logSchemaPath);
     HoodieBaseFile dataFile = new 
HoodieBaseFile(storage.getPathInfo(parquetFilePath));
     // Write a log file for this parquet file
-    Writer logWriter = 
HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
-        
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(dataFile.getFileId())
-        
.withInstantTime(dataFile.getCommitTime()).withStorage(storage).build();
+    Writer logWriter = HoodieLogFormatWriter.builder()
+        .withParentPath(parquetFilePath.getParent())
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withLogFileId(dataFile.getFileId())
+        .withInstantTime(dataFile.getCommitTime())
+        .withStorage(storage)
+        .build();
     List<HoodieRecord> records = 
SchemaTestUtil.generateTestRecords(logSchemaPath, 
dataPath).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
     Map<HeaderMetadataType, String> header = new HashMap<>(2);
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
dataFile.getCommitTime());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index b9fbe74b6d02..24ee11c8ba1e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -927,10 +928,10 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
                                        String baseInstantTime,
                                        String instantTime,
                                        boolean writeDataBlock) throws 
IOException, InterruptedException {
-    try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
-        .onParentPath(new StoragePath(tempDir.toString()))
+    try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+        .withParentPath(new StoragePath(tempDir.toString()))
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-        .withFileId(fileId)
+        .withLogFileId(fileId)
         .withInstantTime(instantTime)
         .withStorage(storage)
         .withSizeThreshold(Long.MAX_VALUE).build()) {

Reply via email to