This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d9801a9ea2 Revert "[HUDI-1517] create marker file for every log file 
(#4913)" (#9251)
3d9801a9ea2 is described below

commit 3d9801a9ea2ca663b8540c21c1adbb4fb64cb95f
Author: Danny Chan <[email protected]>
AuthorDate: Fri Jul 21 20:01:06 2023 +0800

    Revert "[HUDI-1517] create marker file for every log file (#4913)" (#9251)
    
    This reverts commit df64d647
---
 .../hudi/cli/integ/ITTestMarkersCommand.java       |  4 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  7 ++
 .../java/org/apache/hudi/io/HoodieWriteHandle.java | 33 --------
 .../java/org/apache/hudi/table/HoodieTable.java    | 24 ++----
 .../rollback/MarkerBasedRollbackStrategy.java      | 96 ++++++++--------------
 .../hudi/table/marker/DirectWriteMarkers.java      |  4 +-
 .../marker/TimelineServerBasedWriteMarkers.java    |  8 +-
 .../org/apache/hudi/table/marker/WriteMarkers.java | 46 +++++------
 .../providers/HoodieMetaClientProvider.java        | 14 ----
 .../java/org/apache/hudi/io/FlinkAppendHandle.java | 27 +++---
 .../TestMergeOnReadRollbackActionExecutor.java     | 23 ++----
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java | 82 +-----------------
 .../TestHoodieSparkMergeOnReadTableRollback.java   | 13 +++
 .../TestMarkerBasedRollbackStrategy.java           | 59 ++++++-------
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   | 45 ----------
 .../table/log/HoodieLogFileWriteCallback.java      | 42 ----------
 .../hudi/common/table/log/HoodieLogFormat.java     | 15 +---
 .../common/table/log/HoodieLogFormatWriter.java    | 22 ++---
 .../hudi/common/testutils/FileCreateUtils.java     | 52 ++----------
 .../hudi/common/testutils/HoodieTestTable.java     | 10 +--
 .../hudi/sink/bucket/ITTestBucketStreamWrite.java  |  8 +-
 .../sql/hudi/procedure/TestCallProcedure.scala     |  2 +-
 22 files changed, 171 insertions(+), 465 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
index 934164757ae..5aacfd82de0 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
@@ -69,8 +69,8 @@ public class ITTestMarkersCommand extends 
HoodieCLIIntegrationTestBase {
     // generate markers
     String instantTime1 = "101";
 
-    FileCreateUtils.createLogFileMarker(tablePath, "partA", instantTime1, 
"f0", IOType.APPEND);
-    FileCreateUtils.createLogFileMarker(tablePath, "partA", instantTime1, 
"f1", IOType.APPEND);
+    FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f0", 
IOType.APPEND);
+    FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f1", 
IOType.APPEND);
 
     assertEquals(2, FileCreateUtils.getTotalMarkerFileCount(tablePath, 
"partA", instantTime1, IOType.APPEND));
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 1c84d49af29..d0819aa8007 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -201,6 +201,13 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
             new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
             hoodieTable.getPartitionMetafileFormat());
         partitionMetadata.trySave(getPartitionId());
+
+        // Since the actual log file written to can be different based on when 
rollover happens, we use the
+        // base file to denote some log appends happened on a slice. 
writeToken will still fence concurrent
+        // writers.
+        // https://issues.apache.org/jira/browse/HUDI-1517
+        createMarkerFile(partitionPath, 
FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId, 
hoodieTable.getBaseFileExtension()));
+
         this.writer = createLogWriter(fileSlice, baseInstantTime);
       } catch (Exception e) {
         LOG.error("Error in update task at commit " + instantTime, e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index a63050ded5f..9e716a280e8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFileWriteCallback;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -40,7 +39,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
 import org.apache.avro.Schema;
@@ -257,14 +255,9 @@ public abstract class HoodieWriteHandle<T, I, K, O> 
extends HoodieIOHandle<T, I,
         .withRolloverLogWriteToken(writeToken)
         .withLogWriteToken(latestLogFile.map(x -> 
FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
         .withSuffix(suffix)
-        .withLogWriteCallback(getLogWriteCallback())
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
   }
 
-  protected HoodieLogFileWriteCallback getLogWriteCallback() {
-    return new AppendLogWriteCallback();
-  }
-
   protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime, 
String fileSuffix) {
     try {
       return createLogWriter(Option.empty(),baseCommitTime, fileSuffix);
@@ -283,30 +276,4 @@ public abstract class HoodieWriteHandle<T, I, K, O> 
extends HoodieIOHandle<T, I,
       return Option.empty();
     }
   }
-
-  protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback 
{
-    // here we distinguish log files created from log files being appended. 
Considering following scenario:
-    // An appending task write to log file.
-    // (1) append to existing file file_instant_writetoken1.log.1
-    // (2) rollover and create file file_instant_writetoken2.log.2
-    // Then this task failed and retry by a new task.
-    // (3) append to existing file file_instant_writetoken1.log.1
-    // (4) rollover and create file file_instant_writetoken3.log.2
-    // finally file_instant_writetoken2.log.2 should not be committed to hudi, 
we use marker file to delete it.
-    // keep in mind that log file is not always fail-safe unless it never roll 
over
-
-    @Override
-    public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
-      WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
-      return writeMarkers.createIfNotExists(partitionPath, 
logFileToAppend.getFileName(), IOType.APPEND,
-          config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline()).isPresent();
-    }
-
-    @Override
-    public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
-      WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
-      return writeMarkers.create(partitionPath, logFileToCreate.getFileName(), 
IOType.CREATE,
-          config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline()).isPresent();
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 42a12bbcd9e..71295098f03 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -97,7 +97,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
@@ -728,24 +727,19 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
         return;
       }
 
-      // we are not including log appends for update here, since they are 
already fail-safe.
-      // but log files created is included
-      Set<String> invalidFilePaths = getInvalidDataPaths(markers);
-      Set<String> validFilePaths = stats.stream()
+      // we are not including log appends here, since they are already 
fail-safe.
+      Set<String> invalidDataPaths = getInvalidDataPaths(markers);
+      Set<String> validDataPaths = stats.stream()
           .map(HoodieWriteStat::getPath)
-          .collect(Collectors.toSet());
-      Set<String> validCdcFilePaths = stats.stream()
-          .map(HoodieWriteStat::getCdcStats)
-          .filter(Objects::nonNull)
-          .flatMap(cdcStat -> cdcStat.keySet().stream())
+          .filter(p -> p.endsWith(this.getBaseFileExtension()))
           .collect(Collectors.toSet());
 
       // Contains list of partially created files. These needs to be cleaned 
up.
-      invalidFilePaths.removeAll(validFilePaths);
-      invalidFilePaths.removeAll(validCdcFilePaths);
-      if (!invalidFilePaths.isEmpty()) {
-        LOG.info("Removing duplicate files created due to task retries before 
committing. Paths=" + invalidFilePaths);
-        Map<String, List<Pair<String, String>>> invalidPathsByPartition = 
invalidFilePaths.stream()
+      invalidDataPaths.removeAll(validDataPaths);
+
+      if (!invalidDataPaths.isEmpty()) {
+        LOG.info("Removing duplicate data files created due to task retries 
before committing. Paths=" + invalidDataPaths);
+        Map<String, List<Pair<String, String>>> invalidPathsByPartition = 
invalidDataPaths.stream()
             .map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), 
new Path(basePath, dp).toString()))
             .collect(Collectors.groupingBy(Pair::getKey));
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 099f1bdcbfc..791191c0ef3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -18,24 +18,21 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
 
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,9 +41,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static 
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
 
 /**
  * Performs rollback using marker files generated during the write..
@@ -82,32 +78,22 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> 
implements BaseRollbackPlan
       return context.map(markerPaths, markerFilePath -> {
         String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
         IOType type = IOType.valueOf(typeStr);
-        String partitionFilePath = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
-        Path fullFilePath = new Path(basePath, partitionFilePath);
-        String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePath.getParent());
         switch (type) {
           case MERGE:
           case CREATE:
-            String fileId = null;
-            String baseInstantTime = null;
-            if (FSUtils.isBaseFile(fullFilePath)) {
-              HoodieBaseFile baseFileToDelete = new 
HoodieBaseFile(fullFilePath.toString());
-              fileId = baseFileToDelete.getFileId();
-              baseInstantTime = baseFileToDelete.getCommitTime();
-            } else if (FSUtils.isLogFile(fullFilePath)) {
-              checkArgument(type != IOType.MERGE, "Log file should not support 
merge io type");
-              HoodieLogFile logFileToDelete = new 
HoodieLogFile(fullFilePath.toString());
-              fileId = logFileToDelete.getFileId();
-              baseInstantTime = logFileToDelete.getBaseCommitTime();
-            }
-            Objects.requireNonNull(fileId, "Cannot find valid fileId from 
path: " + fullFilePath);
-            Objects.requireNonNull(baseInstantTime, "Cannot find valid base 
instant from path: " + fullFilePath);
-            return new HoodieRollbackRequest(partitionPath, fileId, 
baseInstantTime,
-                Collections.singletonList(fullFilePath.toString()),
+            String fileToDelete = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
+            Path fullDeletePath = new Path(basePath, fileToDelete);
+            String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
+            return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, 
EMPTY_STRING,
+                Collections.singletonList(fullDeletePath.toString()),
                 Collections.emptyMap());
           case APPEND:
-            HoodieRollbackRequest rollbackRequestForAppend = 
getRollbackRequestForAppend(partitionFilePath);
-            return rollbackRequestForAppend;
+            // NOTE: This marker file-path does NOT correspond to a log-file, 
but rather is a phony
+            //       path serving as a "container" for the following 
components:
+            //          - Base file's file-id
+            //          - Base file's commit instant
+            //          - Partition path
+            return getRollbackRequestForAppend(instantToRollback, 
WriteMarkers.stripMarkerSuffix(markerFilePath));
           default:
             throw new HoodieRollbackException("Unknown marker type, during 
rollback of " + instantToRollback);
         }
@@ -117,47 +103,35 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> 
implements BaseRollbackPlan
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(String 
markerFilePath) throws IOException {
-    Path filePath = new Path(basePath, markerFilePath);
-    String fileId;
-    String baseCommitTime;
-    String relativePartitionPath;
-    Option<HoodieLogFile> latestLogFileOption;
-
-    // Old marker files may be generated from base file name before HUDI-1517. 
keep compatible with them.
-    // TODO: may deprecated in the future. @guanziyue.gzy
-    if (FSUtils.isBaseFile(filePath)) {
-      LOG.warn("Find old marker type for log file: " + markerFilePath);
-      fileId = FSUtils.getFileIdFromFilePath(filePath);
-      baseCommitTime = FSUtils.getCommitTime(filePath.getName());
-      relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), filePath.getParent());
-      Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), 
relativePartitionPath);
-
-      // NOTE: Since we're rolling back incomplete Delta Commit, it only could 
have appended its
-      //       block to the latest log-file
-      try {
-        latestLogFileOption = 
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
-            HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
-      } catch (IOException ioException) {
-        throw new HoodieIOException(
-            "Failed to get latestLogFile for fileId: " + fileId + " in 
partition: " + partitionPath,
-            ioException);
-      }
-    } else {
-      HoodieLogFile latestLogFile = new HoodieLogFile(filePath);
-      fileId = latestLogFile.getFileId();
-      baseCommitTime = latestLogFile.getBaseCommitTime();
-      relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), filePath.getParent());
-      latestLogFileOption = Option.of(latestLogFile);
+  protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant 
instantToRollback, String markerFilePath) throws IOException {
+    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
+    String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
+    String baseCommitTime = 
FSUtils.getCommitTime(baseFilePathForAppend.getName());
+    String relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), baseFilePathForAppend.getParent());
+    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), 
relativePartitionPath);
+
+    // NOTE: Since we're rolling back incomplete Delta Commit, it only could 
have appended its
+    //       block to the latest log-file
+    // TODO(HUDI-1517) use provided marker-file's path instead
+    Option<HoodieLogFile> latestLogFileOption = 
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
+        HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
+
+    // Log file can be deleted if the commit to rollback is also the commit 
that created the fileGroup
+    if (latestLogFileOption.isPresent() && 
baseCommitTime.equals(instantToRollback.getTimestamp())) {
+      Path fullDeletePath = new Path(partitionPath, 
latestLogFileOption.get().getFileName());
+      return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, 
EMPTY_STRING,
+          Collections.singletonList(fullDeletePath.toString()),
+          Collections.emptyMap());
     }
-
+    
     Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
     if (latestLogFileOption.isPresent()) {
       HoodieLogFile latestLogFile = latestLogFileOption.get();
       // NOTE: Marker's don't carry information about the cumulative size of 
the blocks that have been appended,
       //       therefore we simply stub this value.
-      logFilesWithBlocsToRollback = 
Collections.singletonMap(latestLogFile.getPath().toString(), -1L);
+      logFilesWithBlocsToRollback = 
Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), 
-1L);
     }
+
     return new HoodieRollbackRequest(relativePartitionPath, fileId, 
baseCommitTime, Collections.emptyList(),
         logFilesWithBlocsToRollback);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index bc2a00df7b2..f9c30ca1736 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -158,8 +158,8 @@ public class DirectWriteMarkers extends WriteMarkers {
   }
 
   @Override
-  protected Option<Path> create(String partitionPath, String fileName, IOType 
type, boolean checkIfExists) {
-    return create(getMarkerPath(partitionPath, fileName, type), checkIfExists);
+  protected Option<Path> create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists) {
+    return create(getMarkerPath(partitionPath, dataFileName, type), 
checkIfExists);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
index b168089cd01..9d6b7f9b9a9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
@@ -133,9 +133,9 @@ public class TimelineServerBasedWriteMarkers extends 
WriteMarkers {
   }
 
   @Override
-  protected Option<Path> create(String partitionPath, String fileName, IOType 
type, boolean checkIfExists) {
+  protected Option<Path> create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists) {
     HoodieTimer timer = HoodieTimer.start();
-    String markerFileName = getMarkerFileName(fileName, type);
+    String markerFileName = getMarkerFileName(dataFileName, type);
 
     Map<String, String> paramsMap = getConfigMap(partitionPath, 
markerFileName, false);
     boolean success = executeCreateMarkerRequest(paramsMap, partitionPath, 
markerFileName);
@@ -149,10 +149,10 @@ public class TimelineServerBasedWriteMarkers extends 
WriteMarkers {
   }
 
   @Override
-  public Option<Path> createWithEarlyConflictDetection(String partitionPath, 
String fileName, IOType type, boolean checkIfExists,
+  public Option<Path> createWithEarlyConflictDetection(String partitionPath, 
String dataFileName, IOType type, boolean checkIfExists,
                                                        HoodieWriteConfig 
config, String fileId, HoodieActiveTimeline activeTimeline) {
     HoodieTimer timer = new HoodieTimer().startTimer();
-    String markerFileName = getMarkerFileName(fileName, type);
+    String markerFileName = getMarkerFileName(dataFileName, type);
     Map<String, String> paramsMap = getConfigMap(partitionPath, 
markerFileName, true);
 
     boolean success = executeCreateMarkerRequest(paramsMap, partitionPath, 
markerFileName);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
index 8a9c5c4ad1a..93aba9c0f89 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
@@ -59,12 +59,12 @@ public abstract class WriteMarkers implements Serializable {
    * Creates a marker without checking if the marker already exists.
    *
    * @param partitionPath partition path in the table.
-   * @param fileName      file name.
+   * @param dataFileName  data file name.
    * @param type          write IO type.
    * @return the marker path.
    */
-  public Option<Path> create(String partitionPath, String fileName, IOType 
type) {
-    return create(partitionPath, fileName, type, false);
+  public Option<Path> create(String partitionPath, String dataFileName, IOType 
type) {
+    return create(partitionPath, dataFileName, type, false);
   }
 
   /**
@@ -72,14 +72,14 @@ public abstract class WriteMarkers implements Serializable {
    * This can invoke marker-based early conflict detection when enabled for 
multi-writers.
    *
    * @param partitionPath  partition path in the table
-   * @param fileName       file name
+   * @param dataFileName   data file name
    * @param type           write IO type
    * @param writeConfig    Hudi write configs.
    * @param fileId         File ID.
    * @param activeTimeline Active timeline for the write operation.
    * @return the marker path.
    */
-  public Option<Path> create(String partitionPath, String fileName, IOType 
type, HoodieWriteConfig writeConfig,
+  public Option<Path> create(String partitionPath, String dataFileName, IOType 
type, HoodieWriteConfig writeConfig,
                              String fileId, HoodieActiveTimeline 
activeTimeline) {
     if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
         && writeConfig.isEarlyConflictDetectionEnable()) {
@@ -88,23 +88,23 @@ public abstract class WriteMarkers implements Serializable {
       // TODO If current is compact or clustering then create marker directly 
without early conflict detection.
       // Need to support early conflict detection between table service and 
common writers.
       if (pendingCompactionTimeline.containsInstant(instantTime) || 
pendingReplaceTimeline.containsInstant(instantTime)) {
-        return create(partitionPath, fileName, type, false);
+        return create(partitionPath, dataFileName, type, false);
       }
-      return createWithEarlyConflictDetection(partitionPath, fileName, type, 
false, writeConfig, fileId, activeTimeline);
+      return createWithEarlyConflictDetection(partitionPath, dataFileName, 
type, false, writeConfig, fileId, activeTimeline);
     }
-    return create(partitionPath, fileName, type, false);
+    return create(partitionPath, dataFileName, type, false);
   }
 
   /**
    * Creates a marker if the marker does not exist.
    *
    * @param partitionPath partition path in the table
-   * @param fileName file name
+   * @param dataFileName data file name
    * @param type write IO type
    * @return the marker path or empty option if already exists
    */
-  public Option<Path> createIfNotExists(String partitionPath, String fileName, 
IOType type) {
-    return create(partitionPath, fileName, type, true);
+  public Option<Path> createIfNotExists(String partitionPath, String 
dataFileName, IOType type) {
+    return create(partitionPath, dataFileName, type, true);
   }
 
   /**
@@ -161,27 +161,27 @@ public abstract class WriteMarkers implements 
Serializable {
   }
 
   /**
-   * Gets the marker file name, in the format of 
"[file_name].marker.[IO_type]".
+   * Gets the marker file name, in the format of 
"[data_file_name].marker.[IO_type]".
    *
-   * @param fileName file name
+   * @param dataFileName data file name
    * @param type IO type
    * @return the marker file name
    */
-  protected static String getMarkerFileName(String fileName, IOType type) {
-    return String.format("%s%s.%s", fileName, 
HoodieTableMetaClient.MARKER_EXTN, type.name());
+  protected String getMarkerFileName(String dataFileName, IOType type) {
+    return String.format("%s%s.%s", dataFileName, 
HoodieTableMetaClient.MARKER_EXTN, type.name());
   }
 
   /**
    * Returns the marker path. Would create the partition path first if not 
exists
    *
    * @param partitionPath The partition path
-   * @param fileName      The file name
+   * @param dataFileName  The data file name
    * @param type          The IO type
    * @return path of the marker file
    */
-  protected Path getMarkerPath(String partitionPath, String fileName, IOType 
type) {
+  protected Path getMarkerPath(String partitionPath, String dataFileName, 
IOType type) {
     Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath);
-    String markerFileName = getMarkerFileName(fileName, type);
+    String markerFileName = getMarkerFileName(dataFileName, type);
     return new Path(path, markerFileName);
   }
 
@@ -203,7 +203,7 @@ public abstract class WriteMarkers implements Serializable {
   /**
    * @param context {@code HoodieEngineContext} instance.
    * @param parallelism parallelism for reading the marker files in the 
directory.
-   * @return all the data file or log file paths of write IO type "CREATE" and 
"MERGE"
+   * @return all the data file paths of write IO type "CREATE" and "MERGE"
    * @throws IOException
    */
   public abstract Set<String> createdAndMergedDataPaths(HoodieEngineContext 
context, int parallelism) throws IOException;
@@ -218,19 +218,19 @@ public abstract class WriteMarkers implements 
Serializable {
    * Creates a marker.
    *
    * @param partitionPath  partition path in the table
-   * @param fileName file name
+   * @param dataFileName  data file name
    * @param type write IO type
    * @param checkIfExists whether to check if the marker already exists
    * @return the marker path or empty option if already exists and {@code 
checkIfExists} is true
    */
-  abstract Option<Path> create(String partitionPath, String fileName, IOType 
type, boolean checkIfExists);
+  abstract Option<Path> create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists);
 
   /**
    * Creates a marker with early conflict detection for multi-writers. If 
conflict is detected,
    * an exception is thrown to fail the write operation.
    *
    * @param partitionPath  partition path in the table.
-   * @param fileName       file name.
+   * @param dataFileName   data file name.
    * @param type           write IO type.
    * @param checkIfExists  whether to check if the marker already exists.
    * @param config         Hudi write configs.
@@ -238,6 +238,6 @@ public abstract class WriteMarkers implements Serializable {
    * @param activeTimeline Active timeline for the write operation.
    * @return the marker path or empty option if already exists and {@code 
checkIfExists} is true.
    */
-  public abstract Option<Path> createWithEarlyConflictDetection(String 
partitionPath, String fileName, IOType type, boolean checkIfExists,
+  public abstract Option<Path> createWithEarlyConflictDetection(String 
partitionPath, String dataFileName, IOType type, boolean checkIfExists,
                                                                 
HoodieWriteConfig config, String fileId, HoodieActiveTimeline activeTimeline);
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
index 721cc5e7c5b..0cd7ed5a715 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
@@ -22,9 +22,6 @@ package org.apache.hudi.testutils.providers;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.SyncableFileSystemView;
-import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -40,15 +37,4 @@ public interface HoodieMetaClientProvider {
       HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, 
FileStatus[] fileStatuses) {
     return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, 
fileStatuses);
   }
-
-  default SyncableFileSystemView 
getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) {
-    try {
-      return new HoodieTableFileSystemView(metaClient,
-          metaClient.getActiveTimeline(),
-          HoodieTestTable.of(metaClient).listAllBaseAndLogFiles()
-      );
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error getting file system view", ioe);
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index d24a01e26a5..4b56d6a442c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -20,10 +20,7 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.log.HoodieLogFileWriteCallback;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkers;
@@ -52,6 +49,7 @@ public class FlinkAppendHandle<T, I, K, O>
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkAppendHandle.class);
 
   private boolean isClosed = false;
+  private final WriteMarkers writeMarkers;
 
   public FlinkAppendHandle(
       HoodieWriteConfig config,
@@ -62,22 +60,17 @@ public class FlinkAppendHandle<T, I, K, O>
       Iterator<HoodieRecord<T>> recordItr,
       TaskContextSupplier taskContextSupplier) {
     super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr, 
taskContextSupplier);
+    this.writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), 
hoodieTable, instantTime);
   }
 
-  protected HoodieLogFileWriteCallback getLogWriteCallback() {
-    return new AppendLogWriteCallback() {
-      @Override
-      public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
-        // In some rare cases, the task was pulled up again with same write 
file name,
-        // for e.g, reuse the small log files from last commit instant.
-
-        // Just skip the marker creation if it already exists, the new data 
would append to
-        // the file directly.
-        WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
-        writeMarkers.createIfNotExists(partitionPath, 
logFileToAppend.getFileName(), IOType.APPEND);
-        return true;
-      }
-    };
+  @Override
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    // In some rare cases, the task was pulled up again with same write file 
name,
+    // for e.g, reuse the small log files from last commit instant.
+
+    // Just skip the marker creation if it already exists, the new data would 
append to
+    // the file directly.
+    writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 848b51a6850..f0f2a5e651a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -123,28 +123,24 @@ public class TestMergeOnReadRollbackActionExecutor 
extends HoodieClientRollbackT
     Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
     assertEquals(2, rollbackMetadata.size());
 
-    int deleteLogFileNum = isUsingMarkers ? 1 : 0;
     for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : 
rollbackMetadata.entrySet()) {
       HoodieRollbackPartitionMetadata meta = entry.getValue();
       assertEquals(0, meta.getFailedDeleteFiles().size());
-      assertEquals(deleteLogFileNum, meta.getSuccessDeleteFiles().size());
+      assertEquals(0, meta.getSuccessDeleteFiles().size());
     }
 
     //4. assert file group after rollback, and compare to the rollbackstat
     // assert the first partition data and log file size
-    int remainingLogFileNum = isUsingMarkers ? 0 : 2;
-
     List<HoodieFileGroup> firstPartitionRollBack1FileGroups = 
table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
     assertEquals(1, firstPartitionRollBack1FileGroups.size());
     List<FileSlice> firstPartitionRollBack1FileSlices = 
firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
     assertEquals(1, firstPartitionRollBack1FileSlices.size());
     FileSlice firstPartitionRollBack1FileSlice = 
firstPartitionRollBack1FileSlices.get(0);
     List<HoodieLogFile> firstPartitionRollBackLogFiles = 
firstPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
-    assertEquals(remainingLogFileNum, firstPartitionRollBackLogFiles.size());
-    if (remainingLogFileNum > 0) {
-      firstPartitionRollBackLogFiles.removeAll(firstPartitionCommit2LogFiles);
-      assertEquals(1, firstPartitionRollBackLogFiles.size());
-    }
+    assertEquals(2, firstPartitionRollBackLogFiles.size());
+
+    firstPartitionRollBackLogFiles.removeAll(firstPartitionCommit2LogFiles);
+    assertEquals(1, firstPartitionRollBackLogFiles.size());
 
     // assert the second partition data and log file size
     List<HoodieFileGroup> secondPartitionRollBack1FileGroups = 
table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
@@ -153,11 +149,10 @@ public class TestMergeOnReadRollbackActionExecutor 
extends HoodieClientRollbackT
     assertEquals(1, secondPartitionRollBack1FileSlices.size());
     FileSlice secondPartitionRollBack1FileSlice = 
secondPartitionRollBack1FileSlices.get(0);
     List<HoodieLogFile> secondPartitionRollBackLogFiles = 
secondPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
-    assertEquals(remainingLogFileNum, secondPartitionRollBackLogFiles.size());
-    if (remainingLogFileNum > 0) {
-      
secondPartitionRollBackLogFiles.removeAll(secondPartitionCommit2LogFiles);
-      assertEquals(1, secondPartitionRollBackLogFiles.size());
-    }
+    assertEquals(2, secondPartitionRollBackLogFiles.size());
+
+    secondPartitionRollBackLogFiles.removeAll(secondPartitionCommit2LogFiles);
+    assertEquals(1, secondPartitionRollBackLogFiles.size());
 
     assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, 
"002").doesMarkerDirExist());
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 5b7ec45b226..f061c152104 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -19,38 +19,24 @@
 
 package org.apache.hudi.table.functional;
 
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.AppendResult;
-import org.apache.hudi.common.table.log.HoodieLogFileWriteCallback;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -58,8 +44,6 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.marker.WriteMarkers;
-import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -75,18 +59,12 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -365,42 +343,6 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
       JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, 
newCommitTime);
-      long expectedLogFileNum = statuses.map(writeStatus -> 
(HoodieDeltaWriteStat) writeStatus.getStat())
-          .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator())
-          .count();
-      // inject a fake log file to test marker file for log file
-      HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) 
statuses.map(WriteStatus::getStat).take(1).get(0);
-      assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
-      HoodieLogFile correctLogFile = new 
HoodieLogFile(correctWriteStat.getPath());
-      String correctWriteToken = 
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
-
-      final String fakeToken = generateFakeWriteToken(correctWriteToken);
-
-      final WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(),
-          HoodieSparkTable.create(config, context()), newCommitTime);
-      HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
-          .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), 
correctWriteStat.getPartitionPath()))
-          
.withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime)
-          .withLogVersion(correctLogFile.getLogVersion())
-          .withFileSize(0L)
-          .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs())
-          .withRolloverLogWriteToken(fakeToken)
-          .withLogWriteToken(fakeToken)
-          .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-          .withLogWriteCallback(new HoodieLogFileWriteCallback() {
-            @Override
-            public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
-              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToCreate.getFileName(), IOType.CREATE).isPresent();
-            }
-          }).build();
-      AppendResult fakeAppendResult = fakeLogWriter.appendBlock(getLogBlock());
-      // check marker for fake log generated
-      assertTrue(writeMarkers.allMarkerFilePaths().stream().anyMatch(marker -> 
marker.contains(fakeToken)));
-      SyncableFileSystemView unCommittedFsView = 
getFileSystemViewWithUnCommittedSlices(metaClient);
-      // check fake log generated
-      
assertTrue(unCommittedFsView.getAllFileSlices(correctWriteStat.getPartitionPath())
-          .flatMap(FileSlice::getLogFiles).map(HoodieLogFile::getPath)
-          .anyMatch(path -> 
path.getName().equals(fakeAppendResult.logFile().getPath().getName())));
       writeClient.commit(newCommitTime, statuses);
 
       HoodieTable table = HoodieSparkTable.create(config, context(), 
metaClient);
@@ -417,13 +359,11 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
           // check the log versions start from the base version
           assertTrue(allSlices.stream().map(slice -> 
slice.getLogFiles().findFirst().get().getLogVersion())
               .allMatch(version -> 
version.equals(HoodieLogFile.LOGFILE_BASE_VERSION)));
-          // check fake log file is deleted
-          assertFalse(allSlices.stream().flatMap(slice -> 
slice.getLogFiles().map(HoodieLogFile::getLogWriteToken))
-              .anyMatch(s -> s.equals(fakeToken)));
         }
         numLogFiles += logFileCount;
       }
-      assertEquals(expectedLogFileNum, numLogFiles);
+
+      assertTrue(numLogFiles > 0);
       // Do a compaction
       String instantTime = 
writeClient.scheduleCompaction(Option.empty()).get().toString();
       HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
writeClient.compact(instantTime);
@@ -434,22 +374,4 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
       writeClient.commitCompaction(instantTime, 
compactionMetadata.getCommitMetadata().get(), Option.empty());
     }
   }
-
-  private HoodieDataBlock getLogBlock() throws IOException, URISyntaxException 
{
-    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-    List<HoodieRecord> hoodieRecords = 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
-    return new HoodieAvroDataBlock(hoodieRecords, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
-  }
-
-  private String generateFakeWriteToken(String correctWriteToken) {
-    Random random = new Random();
-    String fakeToken = "";
-    do {
-      fakeToken = random.nextLong() + "-" + random.nextLong() + "-" + 
random.nextLong();
-    } while (fakeToken.equals(correctWriteToken));
-    return fakeToken;
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index fbc88bb2718..de8c218c1a8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -41,6 +41,7 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -989,4 +990,16 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
             .build());
     return cfgBuilder.build();
   }
+
+  private SyncableFileSystemView 
getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) {
+    try {
+      return new HoodieTableFileSystemView(metaClient,
+          metaClient.getActiveTimeline(),
+          HoodieTestTable.of(metaClient).listAllBaseAndLogFiles()
+      );
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error getting file system view", ioe);
+    }
+  }
+
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 9ab88d3b52a..d806347b682 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -47,7 +47,6 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
@@ -88,14 +87,11 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
 
   @Test
   public void testMarkerBasedRollbackAppend() throws Exception {
-    tearDown();
-    tableType = HoodieTableType.MERGE_ON_READ;
-    setUp();
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     String f0 = testTable.addRequestedCommit("000")
         .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
     testTable.forCommit("001")
-        .withLogMarkerFile("partA", f0, IOType.CREATE);
+        .withMarkerFile("partA", f0, IOType.APPEND);
 
     HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
     List<HoodieRollbackRequest> rollbackRequests = new 
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
@@ -103,29 +99,6 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     assertEquals(1, rollbackRequests.size());
   }
 
-  @ParameterizedTest
-  @EnumSource(names = {"APPEND", "CREATE"})
-  public void testMarkerBasedRollbackAppendWithLogFileMarkers(IOType 
testIOType) throws Exception {
-    tearDown();
-    tableType = HoodieTableType.MERGE_ON_READ;
-    setUp();
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String f0 = testTable.addRequestedCommit("000")
-        .getFileIdWithLogFile("partA");
-    testTable.forCommit("001")
-        .withLogMarkerFile("partA", f0, testIOType);
-
-    HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
-    List<HoodieRollbackRequest> rollbackRequests = new 
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
-        "002").getRollbackRequests(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, 
"001"));
-    assertEquals(1, rollbackRequests.size());
-    HoodieRollbackRequest rollbackRequest = rollbackRequests.get(0);
-    assertEquals("partA", rollbackRequest.getPartitionPath());
-    assertEquals(f0, rollbackRequest.getFileId());
-    assertEquals(testIOType.equals(IOType.CREATE) ? 1 : 0, 
rollbackRequest.getFilesToBeDeleted().size());
-    assertEquals(testIOType.equals(IOType.CREATE) ? 0 : 1, 
rollbackRequest.getLogBlocksToBeDeleted().size());
-  }
-
   @Test
   public void testCopyOnWriteRollbackWithTestTable() throws Exception {
     // given: wrote some base files and corresponding markers
@@ -183,6 +156,34 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     }
   }
 
+  @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
+  @MethodSource("configParams")
+  public void testMergeOnReadRollback(boolean useFileListingMetadata) throws 
Exception {
+    // init MERGE_ON_READ_TABLE
+    tearDown();
+    tableType = HoodieTableType.MERGE_ON_READ;
+    setUp();
+
+    HoodieWriteConfig writeConfig = 
getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+        .withPath(basePath).build();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, writeConfig)) {
+
+      // rollback 2nd commit and ensure stats reflect the info.
+      List<HoodieRollbackStat> stats = 
testUpdateAndRollback(useFileListingMetadata, writeConfig, writeClient);
+
+      assertEquals(3, stats.size());
+      for (HoodieRollbackStat stat : stats) {
+        assertEquals(0, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(1, stat.getCommandBlocksCount().size());
+        stat.getCommandBlocksCount().forEach((fileStatus, len) -> 
assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
+      }
+    }
+  }
+
   @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
   @MethodSource("configParams")
   public void testMergeOnReadRollbackDeletesFirstAppendFiles(boolean 
useFileListingMetadata) throws Exception {
@@ -262,7 +263,7 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     String f0 = testTable.addRequestedCommit("000")
         .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
     testTable.forCommit("001")
-        .withLogMarkerFile("partA", f0, IOType.APPEND);
+        .withMarkerFile("partA", f0, IOType.APPEND);
 
     HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 7d8416c368a..d76db5d5966 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -22,13 +22,11 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -201,7 +199,6 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     Pair<List<HoodieRecord>, List<HoodieRecord>> inputRecords = 
twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, 
secondPartitionCommit2FileSlices, cfg, client, false);
 
     HoodieTable table = this.getHoodieTable(metaClient, cfg);
-    prepForUpgradeFromZeroToOne(table);
     HoodieInstant commitInstant = 
table.getPendingCommitTimeline().lastInstant().get();
 
     // delete one of the marker files in 2nd commit if need be.
@@ -851,48 +848,6 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     return Pair.of(records, records2);
   }
 
-  /**
-   * Since how markers are generated for log file changed in Version Six, we 
regenerate markers in the way version zero do.
-   *
-   * @param table instance of {@link HoodieTable}
-   */
-  private void prepForUpgradeFromZeroToOne(HoodieTable table) throws 
IOException {
-    List<HoodieInstant> instantsToBeParsed =
-        metaClient.getActiveTimeline()
-            .getCommitsTimeline()
-            .getInstantsAsStream()
-            .collect(Collectors.toList());
-    for (HoodieInstant instant : instantsToBeParsed) {
-      WriteMarkers writeMarkers =
-          WriteMarkersFactory.get(table.getConfig().getMarkersType(), table, 
instant.getTimestamp());
-      Set<String> oldMarkers = writeMarkers.allMarkerFilePaths();
-      boolean hasAppendMarker = oldMarkers.stream().anyMatch(marker -> 
marker.contains(IOType.APPEND.name())
-          || marker.contains(IOType.CREATE.name()));
-      if (hasAppendMarker) {
-        // delete all markers and regenerate
-        writeMarkers.deleteMarkerDir(table.getContext(), 2);
-        for (String oldMarker : oldMarkers) {
-          String typeStr = oldMarker.substring(oldMarker.lastIndexOf(".") + 1);
-          IOType type = IOType.valueOf(typeStr);
-          String partitionFilePath = WriteMarkers.stripMarkerSuffix(oldMarker);
-          Path fullFilePath = new Path(basePath, partitionFilePath);
-          String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePath.getParent());
-          if (FSUtils.isBaseFile(fullFilePath)) {
-            writeMarkers.create(partitionPath, fullFilePath.getName(), type);
-          } else {
-            String fileId = FSUtils.getFileIdFromFilePath(fullFilePath);
-            String baseInstant = 
FSUtils.getBaseCommitTimeFromLogPath(fullFilePath);
-            String writeToken = FSUtils.getWriteTokenFromLogPath(fullFilePath);
-            writeMarkers.create(partitionPath,
-                FSUtils.makeBaseFileName(baseInstant, writeToken, fileId, 
table.getBaseFileFormat().getFileExtension()), type);
-          }
-        }
-        writeMarkers.allMarkerFilePaths()
-            .forEach(markerPath -> 
assertFalse(markerPath.contains(HoodieLogFile.DELTA_EXTENSION)));
-      }
-    }
-  }
-
   private void prepForDowngradeFromVersion(HoodieTableVersion fromVersion) 
throws IOException {
     metaClient.getTableConfig().setTableVersion(fromVersion);
     Path propertyFile = new Path(metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java
deleted file mode 100644
index 652c013cc3e..00000000000
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.table.log;
-
-import org.apache.hudi.common.model.HoodieLogFile;
-
-/**
- * HoodieLogFileWriteCallback is trigger when specific log file operation 
happen
- */
-public interface HoodieLogFileWriteCallback {
-  default boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
-    return true;
-  }
-
-  default boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
-    return true;
-  }
-
-  default boolean preLogFileClose(HoodieLogFile logFileToClose) {
-    return true;
-  }
-
-  default boolean postLogFileClose(HoodieLogFile logFileToClose) {
-    return true;
-  }
-}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index 5e7d0806fae..d77be9a281b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -145,8 +145,6 @@ public interface HoodieLogFormat {
     private String suffix;
     // Rollover Log file write token
     private String rolloverLogWriteToken;
-    // A call back triggered with log file operation
-    private HoodieLogFileWriteCallback logFileWriteCallback;
 
     public WriterBuilder withBufferSize(int bufferSize) {
       this.bufferSize = bufferSize;
@@ -203,11 +201,6 @@ public interface HoodieLogFormat {
       return this;
     }
 
-    public WriterBuilder withLogWriteCallback(HoodieLogFileWriteCallback 
logFileWriteCallback) {
-      this.logFileWriteCallback = logFileWriteCallback;
-      return this;
-    }
-
     public WriterBuilder withFileSize(long fileLen) {
       this.fileLen = fileLen;
       return this;
@@ -240,11 +233,6 @@ public interface HoodieLogFormat {
         rolloverLogWriteToken = UNKNOWN_WRITE_TOKEN;
       }
 
-      if (logFileWriteCallback == null) {
-        // use a callback do nothing here as default callback.
-        logFileWriteCallback = new HoodieLogFileWriteCallback() {};
-      }
-
       if (logVersion == null) {
         LOG.info("Computing the next log version for " + logFileId + " in " + 
parentPath);
         Option<Pair<Integer, String>> versionAndWriteToken =
@@ -291,8 +279,7 @@ public interface HoodieLogFormat {
       if (sizeThreshold == null) {
         sizeThreshold = DEFAULT_SIZE_THRESHOLD;
       }
-      return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, 
sizeThreshold,
-          rolloverLogWriteToken, logFileWriteCallback);
+      return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, 
sizeThreshold, rolloverLogWriteToken);
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 07cbfef8fe3..081c18e8f65 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -55,21 +55,18 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   private final Integer bufferSize;
   private final Short replication;
   private final String rolloverLogWriteToken;
-  final HoodieLogFileWriteCallback logFileWriteCallback;
   private boolean closed = false;
   private transient Thread shutdownThread = null;
 
   private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not 
sufficiently replicated yet";
 
-  HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer 
bufferSize, Short replication, Long sizeThreshold,
-                        String rolloverLogWriteToken, 
HoodieLogFileWriteCallback logFileWriteCallback) {
+  HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer 
bufferSize, Short replication, Long sizeThreshold, String 
rolloverLogWriteToken) {
     this.fs = fs;
     this.logFile = logFile;
     this.sizeThreshold = sizeThreshold;
     this.bufferSize = bufferSize;
     this.replication = replication;
     this.rolloverLogWriteToken = rolloverLogWriteToken;
-    this.logFileWriteCallback = logFileWriteCallback;
     addShutDownHook();
   }
 
@@ -97,9 +94,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
       Path path = logFile.getPath();
       if (fs.exists(path)) {
         boolean isAppendSupported = 
StorageSchemes.isAppendSupported(fs.getScheme());
-        // here we use marker file to fence concurrent append to the same 
file. So it is safe to use speculation in spark now.
-        boolean callbackSuccess = logFileWriteCallback.preLogFileOpen(logFile);
-        if (isAppendSupported && callbackSuccess) {
+        if (isAppendSupported) {
           LOG.info(logFile + " exists. Appending to existing file");
           try {
             // open the path for append and record the offset
@@ -121,11 +116,10 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
             }
           }
         }
-        if (!isAppendSupported || !callbackSuccess) {
+        if (!isAppendSupported) {
           rollOver();
           createNewFile();
-          String rolloverReason = isAppendSupported ? "Append not supported" : 
"Callback failed";
-          LOG.info(rolloverReason + ". Rolling over to " + logFile);
+          LOG.info("Append not supported.. Rolling over to " + logFile);
         }
       } else {
         LOG.info(logFile + " does not exist. Create a new file");
@@ -236,7 +230,6 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   }
 
   private void createNewFile() throws IOException {
-    logFileWriteCallback.preLogFileCreate(logFile);
     this.output =
         fs.create(this.logFile.getPath(), false, bufferSize, replication, 
WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
   }
@@ -246,12 +239,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
     if (null != shutdownThread) {
       Runtime.getRuntime().removeShutdownHook(shutdownThread);
     }
-    logFileWriteCallback.preLogFileClose(logFile);
-    try {
-      closeStream();
-    } finally {
-      logFileWriteCallback.postLogFileClose(logFile);
-    }
+    closeStream();
   }
 
   private void closeStream() throws IOException {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 469b5c8bf39..4ace66779ec 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -30,7 +30,6 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.IOType;
@@ -41,7 +40,6 @@ import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -98,20 +96,17 @@ public class FileCreateUtils {
     return FSUtils.makeLogFileName(fileId, fileExtension, instantTime, 
version, WRITE_TOKEN);
   }
 
-  public static String markerFileName(String fileName, IOType ioType) {
-    return String.format("%s%s.%s", fileName, 
HoodieTableMetaClient.MARKER_EXTN, ioType.name());
+  public static String markerFileName(String instantTime, String fileId, 
IOType ioType) {
+    return markerFileName(instantTime, fileId, ioType, BASE_FILE_EXTENSION);
   }
 
-  public static String dataFileMarkerFileName(String instantTime, String 
fileId, IOType ioType, String fileExtension, String writeToken) {
-    return markerFileName(FSUtils.makeBaseFileName(instantTime, writeToken, 
fileId, fileExtension), ioType);
+  public static String markerFileName(String instantTime, String fileId, 
IOType ioType, String fileExtension) {
+    return markerFileName(instantTime, fileId, ioType, fileExtension, 
WRITE_TOKEN);
   }
 
-  public static String logFileMarkerFileName(String instantTime, String 
fileId, IOType ioType, int logVersion) {
-    return logFileMarkerFileName(instantTime, fileId, ioType, 
HoodieLogFile.DELTA_EXTENSION, logVersion);
-  }
-
-  public static String logFileMarkerFileName(String instantTime, String 
fileId, IOType ioType, String fileExtension, int logVersion) {
-    return markerFileName(FSUtils.makeLogFileName(fileId, fileExtension, 
instantTime, logVersion, WRITE_TOKEN), ioType);
+  public static String markerFileName(String instantTime, String fileId, 
IOType ioType, String fileExtension, String writeToken) {
+    return String.format("%s_%s_%s%s%s.%s", fileId, writeToken, instantTime, 
fileExtension,
+        HoodieTableMetaClient.MARKER_EXTN, ioType);
   }
 
   private static void createMetaFile(String basePath, String instantTime, 
String suffix, FileSystem fs) throws IOException {
@@ -373,38 +368,9 @@ public class FileCreateUtils {
 
   public static String createMarkerFile(String basePath, String partitionPath, 
String commitInstant,
       String instantTime, String fileId, IOType ioType, String writeToken) 
throws IOException {
-    // with HUDI-1517, this method is used for creating marker files for data 
files. So append type is not allowed
-    ValidationUtils.checkArgument(ioType != IOType.APPEND);
-    Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
-    Files.createDirectories(parentPath);
-    Path markerFilePath = 
parentPath.resolve(dataFileMarkerFileName(instantTime, fileId, ioType, 
BASE_FILE_EXTENSION, writeToken));
-    if (Files.notExists(markerFilePath)) {
-      Files.createFile(markerFilePath);
-    }
-    return markerFilePath.toAbsolutePath().toString();
-  }
-
-  public static String createLogFileMarker(String basePath, String 
partitionPath, String instantTime, String fileId, IOType ioType)
-      throws IOException {
-    return createLogFileMarker(basePath, partitionPath, instantTime, fileId, 
ioType, HoodieLogFile.LOGFILE_BASE_VERSION);
-  }
-
-  public static String createLogFileMarker(String basePath, String 
partitionPath, String instantTime, String fileId, IOType ioType, int logVersion)
-      throws IOException {
-    Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
-    Files.createDirectories(parentPath);
-    Path markerFilePath = 
parentPath.resolve(logFileMarkerFileName(instantTime, fileId, ioType, 
logVersion));
-    if (Files.notExists(markerFilePath)) {
-      Files.createFile(markerFilePath);
-    }
-    return markerFilePath.toAbsolutePath().toString();
-  }
-
-  public static String createFileMarkerByFileName(String basePath, String 
partitionPath, String instantTime, String fileName, IOType ioType)
-      throws IOException {
-    Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
+    Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, commitInstant, partitionPath);
     Files.createDirectories(parentPath);
-    Path markerFilePath = parentPath.resolve(markerFileName(fileName, ioType));
+    Path markerFilePath = parentPath.resolve(markerFileName(instantTime, 
fileId, ioType, BASE_FILE_EXTENSION, writeToken));
     if (Files.notExists(markerFilePath)) {
       Files.createFile(markerFilePath);
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 49095683a2b..b1dfa366dd8 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -66,6 +66,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -105,7 +107,6 @@ import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDel
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightSavepoint;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.createLogFileMarker;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile;
@@ -132,6 +133,7 @@ public class HoodieTestTable {
   public static final String PHONY_TABLE_SCHEMA =
       "{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\", 
\"name\": \"PhonyRecord\", \"fields\": []}";
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTestTable.class);
   private static final Random RANDOM = new Random();
 
   protected static HoodieTestTableState testTableState;
@@ -596,11 +598,6 @@ public class HoodieTestTable {
     return this;
   }
 
-  public HoodieTestTable withLogMarkerFile(String partitionPath, String 
fileId, IOType ioType) throws IOException {
-    createLogFileMarker(basePath, partitionPath, currentInstantTime, fileId, 
ioType);
-    return this;
-  }
-
   /**
    * Insert one base file to each of the given distinct partitions.
    *
@@ -779,7 +776,6 @@ public class HoodieTestTable {
 
   public FileStatus[] listAllLogFiles(String fileExtension) throws IOException 
{
     return FileSystemTestUtils.listRecursive(fs, new Path(basePath)).stream()
-        .filter(status -> 
!status.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME))
         .filter(status -> status.getPath().getName().contains(fileExtension))
         .toArray(FileStatus[]::new);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
index fbe16e3e52f..ecd31cd719e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
@@ -133,13 +133,17 @@ public class ITTestBucketStreamWrite {
     Path path = new Path(metaClient.getMetaPath() + Path.SEPARATOR + filename);
     fs.delete(path);
 
+    // marker types are different for COW and MOR
+    IOType ioType = isCow ? IOType.CREATE : IOType.APPEND;
+
     commitMetadata.getFileIdAndRelativePaths().forEach((fileId, relativePath) 
-> {
       // hacky way to reconstruct markers ¯\_(ツ)_/¯
       String[] partitionFileNameSplit = relativePath.split("/");
+      String fileInstant = FSUtils.getCommitTime(partitionFileNameSplit[1]);
       String partition = partitionFileNameSplit[0];
-      String fileName = partitionFileNameSplit[1];
+      String writeToken = isCow ? getWriteToken(partitionFileNameSplit[1]) : 
FSUtils.getWriteTokenFromLogPath(new Path(relativePath));
       try {
-        FileCreateUtils.createFileMarkerByFileName(tablePath, partition, 
commitInstant, fileName, IOType.CREATE);
+        FileCreateUtils.createMarkerFile(tablePath, partition, commitInstant, 
fileInstant, fileId, ioType, writeToken);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
index fe4c3bda3d6..5b90e266819 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
@@ -196,7 +196,7 @@ class TestCallProcedure extends 
HoodieSparkProcedureTestBase {
         s"Argument: instant_time is required")
 
       val instantTime = "101"
-      FileCreateUtils.createLogFileMarker(tablePath, "", instantTime, "f0", 
IOType.APPEND)
+      FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0", 
IOType.APPEND)
       assertResult(1) {
         FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, 
IOType.APPEND)
       }

Reply via email to