This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch HUDI-3406-revert
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit de4d5aa429f0aaaf1bb86123238fd40c69e49d85
Author: Raymond Xu <[email protected]>
AuthorDate: Thu Apr 14 17:03:46 2022 +0800

    Revert "[HUDI-3406] Rollback incorrectly relying on FS listing instead of 
Com… (#4957)"
    
    This reverts commit 98b4e9796e1e3e1f69954afa698ace5b28bde4a0.
---
 .../hudi/client/utils/MetadataConversionUtils.java |  17 +-
 .../table/action/rollback/BaseRollbackHelper.java  |   5 +
 .../rollback/ListingBasedRollbackHelper.java       | 150 ++++++++++
 .../rollback/ListingBasedRollbackStrategy.java     | 302 ++-------------------
 .../hudi/table/action/rollback/RollbackUtils.java  | 167 ++++++++++++
 .../action/rollback/SerializablePathFilter.java    |  26 --
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  34 ++-
 .../TestMergeOnReadRollbackActionExecutor.java     |   5 +-
 .../hudi/common/model/HoodieCommitMetadata.java    |  13 -
 9 files changed, 368 insertions(+), 351 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index 342de74a11..c0405161d8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.utils;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
@@ -33,7 +34,6 @@ import 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -186,19 +186,6 @@ public class MetadataConversionUtils {
     return 
Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestedContent.get()));
   }
 
-  public static Option<HoodieCommitMetadata> 
getHoodieCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant 
hoodieInstant) throws IOException {
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-
-    if 
(hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-      return 
Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
-          HoodieReplaceCommitMetadata.class));
-    }
-    return 
Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
-        HoodieCommitMetadata.class));
-
-  }
-
   public static org.apache.hudi.avro.model.HoodieCommitMetadata 
convertCommitMetadata(
           HoodieCommitMetadata hoodieCommitMetadata) {
     ObjectMapper mapper = new ObjectMapper();
@@ -213,4 +200,4 @@ public class MetadataConversionUtils {
     
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
 "");
     return avroMetaData;
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
index 8475afe16e..189de373d9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.rollback;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -213,4 +214,8 @@ public class BaseRollbackHelper implements Serializable {
         
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
     return header;
   }
+
+  public interface SerializablePathFilter extends PathFilter, Serializable {
+
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
new file mode 100644
index 0000000000..628b2fc372
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
+/**
+ * Performs Rollback of Hoodie Tables.
+ */
+public class ListingBasedRollbackHelper implements Serializable {
+  private static final Logger LOG = 
LogManager.getLogger(ListingBasedRollbackHelper.class);
+
+  private final HoodieTableMetaClient metaClient;
+  private final HoodieWriteConfig config;
+
+  public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, 
HoodieWriteConfig config) {
+    this.metaClient = metaClient;
+    this.config = config;
+  }
+
+  /**
+   * Collects info for Rollback plan.
+   */
+  public List<HoodieRollbackRequest> 
getRollbackRequestsForRollbackPlan(HoodieEngineContext context, HoodieInstant 
instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
+    int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), 
config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Creating Rollback 
Plan");
+    return getListingBasedRollbackRequests(context, instantToRollback, 
rollbackRequests, sparkPartitions);
+  }
+
+  /**
+   * May be delete interested files and collect stats or collect stats only.
+   *
+   * @param context           instance of {@link HoodieEngineContext} to use.
+   * @param instantToRollback {@link HoodieInstant} of interest for which 
deletion or collect stats is requested.
+   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to 
be operated on.
+   * @param numPartitions     number of spark partitions to use for 
parallelism.
+   * @return stats collected with or w/o actual deletions.
+   */
+  private List<HoodieRollbackRequest> 
getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant 
instantToRollback,
+                                                                      
List<ListingBasedRollbackRequest> rollbackRequests, int numPartitions) {
+    return context.map(rollbackRequests, rollbackRequest -> {
+      switch (rollbackRequest.getType()) {
+        case DELETE_DATA_FILES_ONLY: {
+          final FileStatus[] filesToDeletedStatus = 
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+              rollbackRequest.getPartitionPath(), metaClient.getFs());
+          List<String> filesToBeDeleted = 
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+            String fileToBeDeleted = fileStatus.getPath().toString();
+            // strip scheme
+            return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+          }).collect(Collectors.toList());
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(),
+              EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, 
Collections.EMPTY_MAP);
+        }
+        case DELETE_DATA_AND_LOG_FILES: {
+          final FileStatus[] filesToDeletedStatus = 
getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(), 
rollbackRequest.getPartitionPath(), metaClient.getFs());
+          List<String> filesToBeDeleted = 
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+            String fileToBeDeleted = fileStatus.getPath().toString();
+            // strip scheme
+            return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+          }).collect(Collectors.toList());
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), 
EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
+        }
+        case APPEND_ROLLBACK_BLOCK: {
+          String fileId = rollbackRequest.getFileId().get();
+          String latestBaseInstant = 
rollbackRequest.getLatestBaseInstant().get();
+          HoodieWriteStat writeStat = rollbackRequest.getWriteStat().get();
+
+          Path fullLogFilePath = 
FSUtils.getPartitionPath(config.getBasePath(), writeStat.getPath());
+
+          Map<String, Long> logFilesWithBlocksToRollback =
+              Collections.singletonMap(fullLogFilePath.toString(), 
writeStat.getTotalWriteBytes());
+
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), 
fileId, latestBaseInstant,
+              Collections.EMPTY_LIST, logFilesWithBlocksToRollback);
+        }
+        default:
+          throw new IllegalStateException("Unknown Rollback action " + 
rollbackRequest);
+      }
+    }, numPartitions);
+  }
+
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+                                               String commit, String 
partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(String commit, String 
partitionPath, FileSystem fs) throws IOException {
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    BaseRollbackHelper.SerializablePathFilter filter = (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's 
okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index ed37798607..e6355526e5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -18,42 +18,19 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.jetbrains.annotations.NotNull;
 
-import static 
org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
-import static 
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+import java.io.IOException;
+import java.util.List;
 
 /**
  * Listing based rollback strategy to fetch list of {@link 
HoodieRollbackRequest}s.
@@ -62,15 +39,12 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
 
   private static final Logger LOG = 
LogManager.getLogger(ListingBasedRollbackStrategy.class);
 
-  protected final HoodieTable<?, ?, ?, ?> table;
-
-  protected final transient HoodieEngineContext context;
-
+  protected final HoodieTable table;
+  protected final HoodieEngineContext context;
   protected final HoodieWriteConfig config;
-
   protected final String instantTime;
 
-  public ListingBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table,
+  public ListingBasedRollbackStrategy(HoodieTable table,
                                       HoodieEngineContext context,
                                       HoodieWriteConfig config,
                                       String instantTime) {
@@ -83,260 +57,20 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      HoodieTableMetaClient metaClient = table.getMetaClient();
-      List<String> partitionPaths =
-          FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
-      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
-
-      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
-
-      HoodieTableType tableType = table.getMetaClient().getTableType();
-      String baseFileExtension = getBaseFileExtension(metaClient);
-      Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(metaClient, instantToRollback);
-      Boolean isCommitMetadataCompleted = 
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
-
-      return context.flatMap(partitionPaths, partitionPath -> {
-        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
-        FileStatus[] filesToDelete =
-            fetchFilesFromInstant(instantToRollback, partitionPath, 
metaClient.getBasePath(), baseFileExtension,
-                metaClient.getFs(), commitMetadataOptional, 
isCommitMetadataCompleted);
-
-        if (HoodieTableType.COPY_ON_WRITE == tableType) {
-          hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
-        } else if (HoodieTableType.MERGE_ON_READ == tableType) {
-          String commit = instantToRollback.getTimestamp();
-          HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
-          switch (instantToRollback.getAction()) {
-            case HoodieTimeline.COMMIT_ACTION:
-            case HoodieTimeline.REPLACE_COMMIT_ACTION:
-              
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
-              break;
-            case HoodieTimeline.COMPACTION_ACTION:
-              // If there is no delta commit present after the current commit 
(if compaction), no action, else we
-              // need to make sure that a compaction commit rollback also 
deletes any log files written as part of the
-              // succeeding deltacommit.
-              boolean higherDeltaCommits =
-                  
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
 1)
-                      .empty();
-              if (higherDeltaCommits) {
-                // Rollback of a compaction action with no higher deltacommit 
means that the compaction is scheduled
-                // and has not yet finished. In this scenario we should delete 
only the newly created base files
-                // and not corresponding base commit log files created with 
this as baseCommit since updates would
-                // have been written to the log files.
-                
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
-                    listFilesToBeDeleted(instantToRollback.getTimestamp(), 
baseFileExtension, partitionPath,
-                        metaClient.getFs())));
-              } else {
-                // No deltacommits present after this compaction commit 
(inflight or requested). In this case, we
-                // can also delete any log files that were created with this 
compaction commit as base
-                // commit.
-                
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
-              }
-              break;
-            case HoodieTimeline.DELTA_COMMIT_ACTION:
-              // 
--------------------------------------------------------------------------------------------------
-              // (A) The following cases are possible if 
index.canIndexLogFiles and/or index.isGlobal
-              // 
--------------------------------------------------------------------------------------------------
-              // (A.1) Failed first commit - Inserts were written to log files 
and HoodieWriteStat has no entries. In
-              // this scenario we would want to delete these log files.
-              // (A.2) Failed recurring commit - Inserts/Updates written to 
log files. In this scenario,
-              // HoodieWriteStat will have the baseCommitTime for the first 
log file written, add rollback blocks.
-              // (A.3) Rollback triggered for first commit - Inserts were 
written to the log files but the commit is
-              // being reverted. In this scenario, HoodieWriteStat will be 
`null` for the attribute prevCommitTime and
-              // and hence will end up deleting these log files. This is done 
so there are no orphan log files
-              // lying around.
-              // (A.4) Rollback triggered for recurring commits - 
Inserts/Updates are being rolled back, the actions
-              // taken in this scenario is a combination of (A.2) and (A.3)
-              // 
---------------------------------------------------------------------------------------------------
-              // (B) The following cases are possible if 
!index.canIndexLogFiles and/or !index.isGlobal
-              // 
---------------------------------------------------------------------------------------------------
-              // (B.1) Failed first commit - Inserts were written to base 
files and HoodieWriteStat has no entries.
-              // In this scenario, we delete all the base files written for 
the failed commit.
-              // (B.2) Failed recurring commits - Inserts were written to base 
files and updates to log files. In
-              // this scenario, perform (A.1) and for updates written to log 
files, write rollback blocks.
-              // (B.3) Rollback triggered for first commit - Same as (B.1)
-              // (B.4) Rollback triggered for recurring commits - Same as 
(B.2) plus we need to delete the log files
-              // as well if the base base file gets deleted.
-              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
-                  
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
-                  HoodieCommitMetadata.class);
-
-              // In case all data was inserts and the commit failed, delete 
the file belonging to that commit
-              // We do not know fileIds for inserts (first inserts are either 
log files or base files),
-              // delete all files for the corresponding failed commit, if 
present (same as COW)
-              
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
-
-              // append rollback blocks for updates and inserts as A.2 and B.2
-              if 
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
-                hoodieRollbackRequests.addAll(
-                    getRollbackRequestToAppend(partitionPath, 
instantToRollback, commitMetadata, table));
-              }
-              break;
-            default:
-              throw new HoodieRollbackException("Unknown listing type, during 
rollback of " + instantToRollback);
-          }
-        } else {
-          throw new HoodieRollbackException(
-              String.format("Unsupported table type: %s, during listing 
rollback of %s", tableType, instantToRollback));
-        }
-        return hoodieRollbackRequests.stream();
-      }, numPartitions);
-    } catch (Exception e) {
+      List<ListingBasedRollbackRequest> rollbackRequests = null;
+      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
+        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
+            table.getMetaClient().getBasePath());
+      } else {
+        rollbackRequests = RollbackUtils
+            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
+      }
+      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
+          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
+      return listingBasedRollbackRequests;
+    } catch (IOException e) {
       LOG.error("Generating rollback requests failed for " + 
instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed 
for " + instantToRollback.getTimestamp(), e);
     }
   }
-
-  private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
-    return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-  }
-
-  @NotNull
-  private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath, 
FileStatus[] filesToDeletedStatus) {
-    List<String> filesToDelete = getFilesToBeDeleted(filesToDeletedStatus);
-    return new HoodieRollbackRequest(
-        partitionPath, EMPTY_STRING, EMPTY_STRING, filesToDelete, 
Collections.emptyMap());
-  }
-
-  @NotNull
-  private List<String> getFilesToBeDeleted(FileStatus[] 
dataFilesToDeletedStatus) {
-    return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
-      String dataFileToBeDeleted = fileStatus.getPath().toString();
-      // strip scheme E.g: file:/var/folders
-      return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 
1);
-    }).collect(Collectors.toList());
-  }
-
-  private FileStatus[] listFilesToBeDeleted(String commit, String 
basefileExtension, String partitionPath,
-                                            FileSystem fs) throws IOException {
-    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
-  }
-
-  private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, 
String partitionPath, String basePath,
-                                             String baseFileExtension, 
HoodieWrapperFileSystem fs,
-                                             Option<HoodieCommitMetadata> 
commitMetadataOptional,
-                                             Boolean 
isCommitMetadataCompleted) throws IOException {
-    if (isCommitMetadataCompleted) {
-      return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(),
-          baseFileExtension, fs);
-    } else {
-      return fetchFilesFromListFiles(instantToRollback, partitionPath, 
basePath, baseFileExtension, fs);
-    }
-  }
-
-  private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback, String partitionPath,
-                                                    String basePath, 
HoodieCommitMetadata commitMetadata,
-                                                    String baseFileExtension, 
HoodieWrapperFileSystem fs)
-      throws IOException {
-    SerializablePathFilter pathFilter = 
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
-    Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, 
partitionPath);
-
-    return fs.listStatus(filePaths, pathFilter);
-  }
-
-  private FileStatus[] fetchFilesFromListFiles(HoodieInstant 
instantToRollback, String partitionPath, String basePath,
-                                               String baseFileExtension, 
HoodieWrapperFileSystem fs)
-      throws IOException {
-    SerializablePathFilter pathFilter = 
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
-    Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);
-
-    return fs.listStatus(filePaths, pathFilter);
-  }
-
-  private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
-                                               Option<HoodieCommitMetadata> 
commitMetadataOptional) {
-    return commitMetadataOptional.isPresent() && 
instantToRollback.isCompleted()
-        && 
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType());
-  }
-
-  private static Path[] listFilesToBeDeleted(String basePath, String 
partitionPath) {
-    return new Path[] {FSUtils.getPartitionPath(basePath, partitionPath)};
-  }
-
-  private static Path[] getFilesFromCommitMetadata(String basePath, 
HoodieCommitMetadata commitMetadata, String partitionPath) {
-    List<String> fullPaths = 
commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath);
-    return fullPaths.stream().map(Path::new).toArray(Path[]::new);
-  }
-
-  @NotNull
-  private static SerializablePathFilter getSerializablePathFilter(String 
basefileExtension, String commit) {
-    return (path) -> {
-      if (path.toString().endsWith(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      } else if (FSUtils.isLogFile(path)) {
-        // Since the baseCommitTime is the only commit for new log files, it's 
okay here
-        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-  }
-
-  public static List<HoodieRollbackRequest> getRollbackRequestToAppend(String 
partitionPath, HoodieInstant rollbackInstant,
-                                                                       
HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
-    List<HoodieRollbackRequest> hoodieRollbackRequests =  new ArrayList<>();
-    
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
-
-    // wStat.getPrevCommit() might not give the right commit time in the 
following
-    // scenario : If a compaction was scheduled, the new commitTime associated 
with the requested compaction will be
-    // used to write the new log files. In this case, the commit time for the 
log file is the compaction requested time.
-    // But the index (global) might store the baseCommit of the base and not 
the requested, hence get the
-    // baseCommit always by listing the file slice
-    // With multi writers, rollbacks could be lazy. and so we need to use 
getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
-    Map<String, FileSlice> latestFileSlices = table.getSliceView()
-        .getLatestFileSlicesBeforeOrOn(partitionPath, 
rollbackInstant.getTimestamp(), true)
-        .collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
-
-    List<HoodieWriteStat> hoodieWriteStats = 
commitMetadata.getPartitionToWriteStats().get(partitionPath)
-        .stream()
-        .filter(writeStat -> {
-          // Filter out stats without prevCommit since they are all inserts
-          boolean validForRollback = (writeStat != null) && 
(!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
-              && (writeStat.getPrevCommit() != null) && 
latestFileSlices.containsKey(writeStat.getFileId());
-
-          if (!validForRollback) {
-            return false;
-          }
-
-          FileSlice latestFileSlice = 
latestFileSlices.get(writeStat.getFileId());
-
-          // For sanity, log-file base-instant time can never be less than 
base-commit on which we are rolling back
-          checkArgument(
-              
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
-                  HoodieTimeline.LESSER_THAN_OR_EQUALS, 
rollbackInstant.getTimestamp()),
-              "Log-file base-instant could not be less than the instant being 
rolled back");
-
-          // Command block "rolling back" the preceding block {@link 
HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
-          // w/in the latest file-slice is appended iff base-instant of the 
log-file is _strictly_ less
-          // than the instant of the Delta Commit being rolled back. 
Otherwise, log-file will be cleaned up
-          // in a different branch of the flow.
-          return 
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), 
HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
-        })
-        .collect(Collectors.toList());
-
-    for (HoodieWriteStat writeStat : hoodieWriteStats) {
-      FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
-      String fileId = writeStat.getFileId();
-      String latestBaseInstant = latestFileSlice.getBaseInstantTime();
-
-      Path fullLogFilePath = 
FSUtils.getPartitionPath(table.getConfig().getBasePath(), writeStat.getPath());
-
-      Map<String, Long> logFilesWithBlocksToRollback =
-          Collections.singletonMap(fullLogFilePath.toString(), 
writeStat.getTotalWriteBytes());
-
-      hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, 
fileId, latestBaseInstant,
-          Collections.emptyList(), logFilesWithBlocksToRollback));
-    }
-
-    return hoodieRollbackRequests;
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index ce7a185151..2bc9b59b0d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -21,13 +21,21 @@ package org.apache.hudi.table.action.rollback;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -36,6 +44,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
@@ -91,4 +102,160 @@ public class RollbackUtils {
     return new HoodieRollbackStat(stat1.getPartitionPath(), 
successDeleteFiles, failedDeleteFiles, commandBlocksCount);
   }
 
+  /**
+   * Generate all rollback requests that needs rolling back this action 
without actually performing rollback for COW table type.
+   * @param engineContext instance of {@link HoodieEngineContext} to use.
+   * @param basePath base path of interest.
+   * @return {@link List} of {@link ListingBasedRollbackRequest}s thus 
collected.
+   */
+  public static List<ListingBasedRollbackRequest> 
generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String 
basePath) {
+    return FSUtils.getAllPartitionPaths(engineContext, basePath, false, 
false).stream()
+        
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Generate all rollback requests that we need to perform for rolling back 
this action without actually performing rolling back for MOR table type.
+   *
+   * @param instantToRollback Instant to Rollback
+   * @param table instance of {@link HoodieTable} to use.
+   * @param context instance of {@link HoodieEngineContext} to use.
+   * @return list of rollback requests
+   */
+  public static List<ListingBasedRollbackRequest> 
generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, 
HoodieTable table, HoodieEngineContext context) throws IOException {
+    String commit = instantToRollback.getTimestamp();
+    HoodieWriteConfig config = table.getConfig();
+    List<String> partitions = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+    if (partitions.isEmpty()) {
+      return new ArrayList<>();
+    }
+    int sparkPartitions = Math.max(Math.min(partitions.size(), 
config.getRollbackParallelism()), 1);
+    context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all 
rollback requests");
+    return context.flatMap(partitions, partitionPath -> {
+      HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
+      List<ListingBasedRollbackRequest> partitionRollbackRequests = new 
ArrayList<>();
+      switch (instantToRollback.getAction()) {
+        case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:
+          LOG.info("Rolling back commit action.");
+          partitionRollbackRequests.add(
+              
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+          break;
+        case HoodieTimeline.COMPACTION_ACTION:
+          // If there is no delta commit present after the current commit (if 
compaction), no action, else we
+          // need to make sure that a compaction commit rollback also deletes 
any log files written as part of the
+          // succeeding deltacommit.
+          boolean higherDeltaCommits =
+              
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
 1).empty();
+          if (higherDeltaCommits) {
+            // Rollback of a compaction action with no higher deltacommit 
means that the compaction is scheduled
+            // and has not yet finished. In this scenario we should delete 
only the newly created base files
+            // and not corresponding base commit log files created with this 
as baseCommit since updates would
+            // have been written to the log files.
+            LOG.info("Rolling back compaction. There are higher delta commits. 
So only deleting data files");
+            partitionRollbackRequests.add(
+                
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath));
+          } else {
+            // No deltacommits present after this compaction commit (inflight 
or requested). In this case, we
+            // can also delete any log files that were created with this 
compaction commit as base
+            // commit.
+            LOG.info("Rolling back compaction plan. There are NO higher delta 
commits. So deleting both data and"
+                + " log files");
+            partitionRollbackRequests.add(
+                
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+          }
+          break;
+        case HoodieTimeline.DELTA_COMMIT_ACTION:
+          // 
--------------------------------------------------------------------------------------------------
+          // (A) The following cases are possible if index.canIndexLogFiles 
and/or index.isGlobal
+          // 
--------------------------------------------------------------------------------------------------
+          // (A.1) Failed first commit - Inserts were written to log files and 
HoodieWriteStat has no entries. In
+          // this scenario we would want to delete these log files.
+          // (A.2) Failed recurring commit - Inserts/Updates written to log 
files. In this scenario,
+          // HoodieWriteStat will have the baseCommitTime for the first log 
file written, add rollback blocks.
+          // (A.3) Rollback triggered for first commit - Inserts were written 
to the log files but the commit is
+          // being reverted. In this scenario, HoodieWriteStat will be `null` 
for the attribute prevCommitTime and
+          // and hence will end up deleting these log files. This is done so 
there are no orphan log files
+          // lying around.
+          // (A.4) Rollback triggered for recurring commits - Inserts/Updates 
are being rolled back, the actions
+          // taken in this scenario is a combination of (A.2) and (A.3)
+          // 
---------------------------------------------------------------------------------------------------
+          // (B) The following cases are possible if !index.canIndexLogFiles 
and/or !index.isGlobal
+          // 
---------------------------------------------------------------------------------------------------
+          // (B.1) Failed first commit - Inserts were written to base files 
and HoodieWriteStat has no entries.
+          // In this scenario, we delete all the base files written for the 
failed commit.
+          // (B.2) Failed recurring commits - Inserts were written to base 
files and updates to log files. In
+          // this scenario, perform (A.1) and for updates written to log 
files, write rollback blocks.
+          // (B.3) Rollback triggered for first commit - Same as (B.1)
+          // (B.4) Rollback triggered for recurring commits - Same as (B.2) 
plus we need to delete the log files
+          // as well if the base base file gets deleted.
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+              
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+              HoodieCommitMetadata.class);
+
+          // In case all data was inserts and the commit failed, delete the 
file belonging to that commit
+          // We do not know fileIds for inserts (first inserts are either log 
files or base files),
+          // delete all files for the corresponding failed commit, if present 
(same as COW)
+          partitionRollbackRequests.add(
+              
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+
+          // append rollback blocks for updates and inserts as A.2 and B.2
+          if 
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+            partitionRollbackRequests
+                .addAll(generateAppendRollbackBlocksAction(partitionPath, 
instantToRollback, commitMetadata, table));
+          }
+          break;
+        default:
+          break;
+      }
+      return partitionRollbackRequests.stream();
+    }, Math.min(partitions.size(), 
sparkPartitions)).stream().filter(Objects::nonNull).collect(Collectors.toList());
+  }
+
+  private static List<ListingBasedRollbackRequest> 
generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant 
rollbackInstant,
+      HoodieCommitMetadata commitMetadata, HoodieTable table) {
+    
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+
+    // wStat.getPrevCommit() might not give the right commit time in the 
following
+    // scenario : If a compaction was scheduled, the new commitTime associated 
with the requested compaction will be
+    // used to write the new log files. In this case, the commit time for the 
log file is the compaction requested time.
+    // But the index (global) might store the baseCommit of the base and not 
the requested, hence get the
+    // baseCommit always by listing the file slice
+    // With multi writers, rollbacks could be lazy. and so we need to use 
getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
+    Map<String, FileSlice> latestFileSlices = table.getSliceView()
+        .getLatestFileSlicesBeforeOrOn(partitionPath, 
rollbackInstant.getTimestamp(), true)
+        .collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
+
+    return commitMetadata.getPartitionToWriteStats().get(partitionPath)
+        .stream()
+        .filter(writeStat -> {
+          // Filter out stats without prevCommit since they are all inserts
+          boolean validForRollback = (writeStat != null) && 
(!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
+              && (writeStat.getPrevCommit() != null) && 
latestFileSlices.containsKey(writeStat.getFileId());
+
+          if (!validForRollback) {
+            return false;
+          }
+
+          FileSlice latestFileSlice = 
latestFileSlices.get(writeStat.getFileId());
+
+          // For sanity, log-file base-instant time can never be less than 
base-commit on which we are rolling back
+          checkArgument(
+              
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
+                  HoodieTimeline.LESSER_THAN_OR_EQUALS, 
rollbackInstant.getTimestamp()),
+              "Log-file base-instant could not be less than the instant being 
rolled back");
+
+          // Command block "rolling back" the preceding block {@link 
HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
+          // w/in the latest file-slice is appended iff base-instant of the 
log-file is _strictly_ less
+          // than the instant of the Delta Commit being rolled back. 
Otherwise, log-file will be cleaned up
+          // in a different branch of the flow.
+          return 
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), 
HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
+        })
+        .map(writeStat -> {
+          FileSlice latestFileSlice = 
latestFileSlices.get(writeStat.getFileId());
+          return 
ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath,
+              writeStat.getFileId(), latestFileSlice.getBaseInstantTime(), 
writeStat);
+        })
+        .collect(Collectors.toList());
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
deleted file mode 100644
index e2affdf5ca..0000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hadoop.fs.PathFilter;
-
-import java.io.Serializable;
-
-public interface SerializablePathFilter extends PathFilter, Serializable {
-}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 42add690f2..6a114154c8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -18,14 +18,14 @@
 
 package org.apache.hudi.table.upgrade;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -35,10 +35,15 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -95,7 +100,14 @@ public class ZeroToOneUpgradeHandler implements 
UpgradeHandler {
         writeMarkers.quietDeleteMarkerDir(context, parallelism);
 
         // generate rollback stats
-        List<HoodieRollbackStat> rollbackStats = 
getListBasedRollBackStats(table, context, commitInstantOpt);
+        List<ListingBasedRollbackRequest> rollbackRequests;
+        if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
+          rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context, 
table.getMetaClient().getBasePath());
+        } else {
+          rollbackRequests = 
RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(),
 table, context);
+        }
+        List<HoodieRollbackStat> rollbackStats = 
getListBasedRollBackStats(table.getMetaClient(), table.getConfig(),
+            context, commitInstantOpt, rollbackRequests);
 
         // recreate markers adhering to marker based rollback
         for (HoodieRollbackStat rollbackStat : rollbackStats) {
@@ -114,12 +126,12 @@ public class ZeroToOneUpgradeHandler implements 
UpgradeHandler {
     }
   }
 
-  List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTable<?, ?, ?, ?> 
table, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt) {
-    List<HoodieRollbackRequest> hoodieRollbackRequests =
-        new ListingBasedRollbackStrategy(table, context, table.getConfig(), 
commitInstantOpt.get().getTimestamp())
-            .getRollbackRequests(commitInstantOpt.get());
-    return new BaseRollbackHelper(table.getMetaClient(), table.getConfig())
-        .collectRollbackStats(context, commitInstantOpt.get(), 
hoodieRollbackRequests);
+  List<HoodieRollbackStat> getListBasedRollBackStats(
+      HoodieTableMetaClient metaClient, HoodieWriteConfig config, 
HoodieEngineContext context,
+      Option<HoodieInstant> commitInstantOpt, 
List<ListingBasedRollbackRequest> rollbackRequests) {
+    List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ListingBasedRollbackHelper(metaClient, config)
+        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), 
rollbackRequests);
+    return new BaseRollbackHelper(metaClient, 
config).collectRollbackStats(context, commitInstantOpt.get(), 
hoodieRollbackRequests);
   }
 
   /**
@@ -131,7 +143,7 @@ public class ZeroToOneUpgradeHandler implements 
UpgradeHandler {
    * @param table       {@link HoodieTable} instance to use
    * @return the marker file name thus curated.
    */
-  private static String getFileNameForMarkerFromLogFile(String logFilePath, 
HoodieTable<?, ?, ?, ?> table) {
+  private static String getFileNameForMarkerFromLogFile(String logFilePath, 
HoodieTable table) {
     Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath);
     String fileId = FSUtils.getFileIdFromLogPath(logPath);
     String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index d8ce6612a4..c9e3fed871 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -45,6 +45,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -124,8 +125,8 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
 
     for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : 
rollbackMetadata.entrySet()) {
       HoodieRollbackPartitionMetadata meta = entry.getValue();
-      assertEquals(0, meta.getFailedDeleteFiles().size());
-      assertEquals(0, meta.getSuccessDeleteFiles().size());
+      assertTrue(meta.getFailedDeleteFiles() == null || 
meta.getFailedDeleteFiles().size() == 0);
+      assertTrue(meta.getSuccessDeleteFiles() == null || 
meta.getSuccessDeleteFiles().size() == 0);
     }
 
     //4. assert file group after rollback, and compare to the rollbackstat
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 53ceb00409..40bc3c8280 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -138,19 +138,6 @@ public class HoodieCommitMetadata implements Serializable {
     return fullPaths;
   }
 
-  public List<String> getFullPathsByPartitionPath(String basePath, String 
partitionPath) {
-    HashSet<String> fullPaths = new HashSet<>();
-    if (getPartitionToWriteStats().get(partitionPath) != null) {
-      for (HoodieWriteStat stat : 
getPartitionToWriteStats().get(partitionPath)) {
-        if ((stat.getFileId() != null)) {
-          String fullPath = FSUtils.getPartitionPath(basePath, 
stat.getPath()).toString();
-          fullPaths.add(fullPath);
-        }
-      }
-    }
-    return new ArrayList<>(fullPaths);
-  }
-
   public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String 
basePath) {
     Map<HoodieFileGroupId, String> fileGroupIdToFullPaths = new HashMap<>();
     for (Map.Entry<String, List<HoodieWriteStat>> entry : 
getPartitionToWriteStats().entrySet()) {

Reply via email to