alexeykudinkin commented on a change in pull request #4957:
URL: https://github.com/apache/hudi/pull/4957#discussion_r831683651



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        List<ListingRollbackActionType> types = 
getListingRollbackType(instantToRollback, table, partitionPath);
+        for (ListingRollbackActionType type : types) {
+          switch (type) {
+            case DELETE_DATA_FILES_ONLY:
+              final FileStatus[] dataFilesToDeletedStatus = 
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+                  partitionPath, metaClient.getFs());
+              List<String> dataFilesToBeDeleted = 
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+                String dataFileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return 
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new 
HoodieRollbackRequest(partitionPath,
+                  EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case DELETE_DATA_AND_LOG_FILES:
+              final FileStatus[] filesToDeletedStatus = 
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+              List<String> filesToBeDeleted = 
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+                String fileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") 
+ 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new HoodieRollbackRequest(
+                  partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case APPEND_ROLLBACK_BLOCK:
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+                  HoodieCommitMetadata.class);
+
+              
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath, 
instantToRollback, commitMetadata, table));
+              break;
+            default:
+              break;
+          }
+        }
+        return hoodieRollbackRequests.stream();
+      }, numPartitions);
+    } catch (Exception e) {
       LOG.error("Generating rollback requests failed for " + 
instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed 
for " + instantToRollback.getTimestamp(), e);
     }
   }
+
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+                                               String commit, String 
partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieInstant instantToRollback, String partitionPath) throws 
IOException {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, instantToRollback);
+    SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient, 
instantToRollback.getTimestamp());
+    Path[] filePaths;
+    if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+        && 
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
 {
+      filePaths = getFilesFromCommitMetadata(metaClient, 
commitMetadataOptional.get(), partitionPath);
+    } else {
+      filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+    }
+
+    return metaClient.getFs().listStatus(filePaths, pathFilter);
+  }
+
+  private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient, 
String partitionPath) {

Review comment:
       If we're just accessing the base-path there's no need to pass whole 
`metaClient`, right?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -18,33 +18,60 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static 
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
 
 /**
  * Listing based rollback strategy to fetch list of {@link 
HoodieRollbackRequest}s.
  */
-public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecutor.RollbackStrategy {
+public class ListingBasedRollbackStrategy<T extends HoodieRecordPayload, I, K, 
O> implements BaseRollbackPlanActionExecutor.RollbackStrategy {

Review comment:
       This should not be necessary

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        List<ListingRollbackActionType> types = 
getListingRollbackType(instantToRollback, table, partitionPath);
+        for (ListingRollbackActionType type : types) {
+          switch (type) {
+            case DELETE_DATA_FILES_ONLY:
+              final FileStatus[] dataFilesToDeletedStatus = 
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+                  partitionPath, metaClient.getFs());
+              List<String> dataFilesToBeDeleted = 
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+                String dataFileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return 
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);

Review comment:
       Let's extract this to a method and handle the case when there's no 
scheme in the path

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        List<ListingRollbackActionType> types = 
getListingRollbackType(instantToRollback, table, partitionPath);
+        for (ListingRollbackActionType type : types) {
+          switch (type) {
+            case DELETE_DATA_FILES_ONLY:
+              final FileStatus[] dataFilesToDeletedStatus = 
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+                  partitionPath, metaClient.getFs());
+              List<String> dataFilesToBeDeleted = 
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+                String dataFileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return 
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new 
HoodieRollbackRequest(partitionPath,
+                  EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case DELETE_DATA_AND_LOG_FILES:
+              final FileStatus[] filesToDeletedStatus = 
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+              List<String> filesToBeDeleted = 
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+                String fileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") 
+ 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new HoodieRollbackRequest(
+                  partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case APPEND_ROLLBACK_BLOCK:
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+                  HoodieCommitMetadata.class);
+
+              
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath, 
instantToRollback, commitMetadata, table));
+              break;
+            default:
+              break;
+          }
+        }
+        return hoodieRollbackRequests.stream();
+      }, numPartitions);
+    } catch (Exception e) {
       LOG.error("Generating rollback requests failed for " + 
instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed 
for " + instantToRollback.getTimestamp(), e);
     }
   }
+
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+                                               String commit, String 
partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieInstant instantToRollback, String partitionPath) throws 
IOException {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, instantToRollback);
+    SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient, 
instantToRollback.getTimestamp());
+    Path[] filePaths;
+    if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+        && 
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
 {
+      filePaths = getFilesFromCommitMetadata(metaClient, 
commitMetadataOptional.get(), partitionPath);
+    } else {
+      filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+    }
+
+    return metaClient.getFs().listStatus(filePaths, pathFilter);
+  }
+
+  private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient, 
String partitionPath) {
+    return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(), 
partitionPath)};
+  }
+
+  private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient 
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {

Review comment:
       Same as above

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        List<ListingRollbackActionType> types = 
getListingRollbackType(instantToRollback, table, partitionPath);
+        for (ListingRollbackActionType type : types) {
+          switch (type) {
+            case DELETE_DATA_FILES_ONLY:
+              final FileStatus[] dataFilesToDeletedStatus = 
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+                  partitionPath, metaClient.getFs());
+              List<String> dataFilesToBeDeleted = 
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+                String dataFileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return 
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new 
HoodieRollbackRequest(partitionPath,
+                  EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case DELETE_DATA_AND_LOG_FILES:
+              final FileStatus[] filesToDeletedStatus = 
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+              List<String> filesToBeDeleted = 
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+                String fileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") 
+ 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new HoodieRollbackRequest(
+                  partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case APPEND_ROLLBACK_BLOCK:
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+                  HoodieCommitMetadata.class);
+
+              
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath, 
instantToRollback, commitMetadata, table));
+              break;
+            default:
+              break;
+          }
+        }
+        return hoodieRollbackRequests.stream();
+      }, numPartitions);
+    } catch (Exception e) {
       LOG.error("Generating rollback requests failed for " + 
instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed 
for " + instantToRollback.getTimestamp(), e);
     }
   }
+
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+                                               String commit, String 
partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieInstant instantToRollback, String partitionPath) throws 
IOException {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, instantToRollback);
+    SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient, 
instantToRollback.getTimestamp());
+    Path[] filePaths;
+    if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+        && 
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
 {
+      filePaths = getFilesFromCommitMetadata(metaClient, 
commitMetadataOptional.get(), partitionPath);
+    } else {
+      filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+    }
+
+    return metaClient.getFs().listStatus(filePaths, pathFilter);
+  }
+
+  private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient, 
String partitionPath) {
+    return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(), 
partitionPath)};
+  }
+
+  private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient 
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
+    List<String> fullPaths = 
commitMetadata.getFullPathsByPartitionPath(metaClient.getBasePath(), 
partitionPath);
+    return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+  }
+
+  @NotNull
+  private static SerializablePathFilter 
getSerializablePathFilter(HoodieTableMetaClient metaClient, String commit) {
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    return (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's 
okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+  }
+
+  private static List<ListingRollbackActionType> 
getListingRollbackType(HoodieInstant instantToRollback,
+                                                                        
HoodieTable<?, ?, ?, ?> table, String partitionPath) throws IOException {
+    List<ListingRollbackActionType> listingRollbackActionTypes = new 
ArrayList<>(2);
+    if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) 
{
+      
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+    } else {
+      String commit = instantToRollback.getTimestamp();
+      HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
+      switch (instantToRollback.getAction()) {
+        case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:
+          
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);

Review comment:
       Please ditto in all branches

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        List<ListingRollbackActionType> types = 
getListingRollbackType(instantToRollback, table, partitionPath);

Review comment:
       We will need to untangle this a little bit -- i don't think it's 
possible for a single commit to write both base files _and_  log files, hence 
rollback should only be handling either of this.
   
   cc @nsivabalan 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        List<ListingRollbackActionType> types = 
getListingRollbackType(instantToRollback, table, partitionPath);
+        for (ListingRollbackActionType type : types) {
+          switch (type) {
+            case DELETE_DATA_FILES_ONLY:
+              final FileStatus[] dataFilesToDeletedStatus = 
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+                  partitionPath, metaClient.getFs());
+              List<String> dataFilesToBeDeleted = 
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+                String dataFileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return 
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new 
HoodieRollbackRequest(partitionPath,
+                  EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case DELETE_DATA_AND_LOG_FILES:
+              final FileStatus[] filesToDeletedStatus = 
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+              List<String> filesToBeDeleted = 
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+                String fileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") 
+ 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new HoodieRollbackRequest(
+                  partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case APPEND_ROLLBACK_BLOCK:
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+                  HoodieCommitMetadata.class);
+
+              
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath, 
instantToRollback, commitMetadata, table));
+              break;
+            default:
+              break;
+          }
+        }
+        return hoodieRollbackRequests.stream();
+      }, numPartitions);
+    } catch (Exception e) {
       LOG.error("Generating rollback requests failed for " + 
instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed 
for " + instantToRollback.getTimestamp(), e);
     }
   }
+
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+                                               String commit, String 
partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieInstant instantToRollback, String partitionPath) throws 
IOException {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, instantToRollback);
+    SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient, 
instantToRollback.getTimestamp());
+    Path[] filePaths;
+    if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+        && 
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
 {
+      filePaths = getFilesFromCommitMetadata(metaClient, 
commitMetadataOptional.get(), partitionPath);
+    } else {
+      filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+    }
+
+    return metaClient.getFs().listStatus(filePaths, pathFilter);
+  }
+
+  private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient, 
String partitionPath) {
+    return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(), 
partitionPath)};
+  }
+
+  private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient 
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
+    List<String> fullPaths = 
commitMetadata.getFullPathsByPartitionPath(metaClient.getBasePath(), 
partitionPath);
+    return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+  }
+
+  @NotNull
+  private static SerializablePathFilter 
getSerializablePathFilter(HoodieTableMetaClient metaClient, String commit) {

Review comment:
       Please do holistic eval of all parameters being passed and let's make 
sure that we pass only necessary payload

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        List<ListingRollbackActionType> types = 
getListingRollbackType(instantToRollback, table, partitionPath);
+        for (ListingRollbackActionType type : types) {
+          switch (type) {
+            case DELETE_DATA_FILES_ONLY:
+              final FileStatus[] dataFilesToDeletedStatus = 
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+                  partitionPath, metaClient.getFs());
+              List<String> dataFilesToBeDeleted = 
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+                String dataFileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return 
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new 
HoodieRollbackRequest(partitionPath,
+                  EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case DELETE_DATA_AND_LOG_FILES:
+              final FileStatus[] filesToDeletedStatus = 
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+              List<String> filesToBeDeleted = 
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+                String fileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") 
+ 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new HoodieRollbackRequest(
+                  partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case APPEND_ROLLBACK_BLOCK:
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+                  HoodieCommitMetadata.class);
+
+              
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath, 
instantToRollback, commitMetadata, table));
+              break;
+            default:
+              break;
+          }
+        }
+        return hoodieRollbackRequests.stream();
+      }, numPartitions);
+    } catch (Exception e) {
       LOG.error("Generating rollback requests failed for " + 
instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed 
for " + instantToRollback.getTimestamp(), e);
     }
   }
+
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+                                               String commit, String 
partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieInstant instantToRollback, String partitionPath) throws 
IOException {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, instantToRollback);
+    SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient, 
instantToRollback.getTimestamp());
+    Path[] filePaths;
+    if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+        && 
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
 {
+      filePaths = getFilesFromCommitMetadata(metaClient, 
commitMetadataOptional.get(), partitionPath);
+    } else {
+      filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+    }
+
+    return metaClient.getFs().listStatus(filePaths, pathFilter);
+  }
+
+  private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient, 
String partitionPath) {
+    return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(), 
partitionPath)};
+  }
+
+  private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient 
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
+    List<String> fullPaths = 
commitMetadata.getFullPathsByPartitionPath(metaClient.getBasePath(), 
partitionPath);
+    return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+  }
+
+  @NotNull
+  private static SerializablePathFilter 
getSerializablePathFilter(HoodieTableMetaClient metaClient, String commit) {
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    return (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's 
okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+  }
+
+  private static List<ListingRollbackActionType> 
getListingRollbackType(HoodieInstant instantToRollback,
+                                                                        
HoodieTable<?, ?, ?, ?> table, String partitionPath) throws IOException {
+    List<ListingRollbackActionType> listingRollbackActionTypes = new 
ArrayList<>(2);
+    if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) 
{
+      
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+    } else {
+      String commit = instantToRollback.getTimestamp();
+      HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
+      switch (instantToRollback.getAction()) {
+        case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:
+          
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);

Review comment:
       Let's simplify control flow and just return here

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        List<ListingRollbackActionType> types = 
getListingRollbackType(instantToRollback, table, partitionPath);
+        for (ListingRollbackActionType type : types) {
+          switch (type) {
+            case DELETE_DATA_FILES_ONLY:
+              final FileStatus[] dataFilesToDeletedStatus = 
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+                  partitionPath, metaClient.getFs());
+              List<String> dataFilesToBeDeleted = 
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+                String dataFileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return 
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new 
HoodieRollbackRequest(partitionPath,
+                  EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case DELETE_DATA_AND_LOG_FILES:
+              final FileStatus[] filesToDeletedStatus = 
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+              List<String> filesToBeDeleted = 
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+                String fileToBeDeleted = fileStatus.getPath().toString();
+                // strip scheme
+                return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") 
+ 1);
+              }).collect(Collectors.toList());
+              hoodieRollbackRequests.add(new HoodieRollbackRequest(
+                  partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, 
Collections.emptyMap()));
+              break;
+            case APPEND_ROLLBACK_BLOCK:
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+                  HoodieCommitMetadata.class);
+
+              
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath, 
instantToRollback, commitMetadata, table));
+              break;
+            default:
+              break;
+          }
+        }
+        return hoodieRollbackRequests.stream();
+      }, numPartitions);
+    } catch (Exception e) {
       LOG.error("Generating rollback requests failed for " + 
instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed 
for " + instantToRollback.getTimestamp(), e);
     }
   }
+
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+                                               String commit, String 
partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient 
metaClient, HoodieInstant instantToRollback, String partitionPath) throws 
IOException {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, instantToRollback);
+    SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient, 
instantToRollback.getTimestamp());
+    Path[] filePaths;
+    if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+        && 
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
 {
+      filePaths = getFilesFromCommitMetadata(metaClient, 
commitMetadataOptional.get(), partitionPath);
+    } else {
+      filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+    }
+
+    return metaClient.getFs().listStatus(filePaths, pathFilter);
+  }
+
+  private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient, 
String partitionPath) {
+    return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(), 
partitionPath)};
+  }
+
+  private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient 
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
+    List<String> fullPaths = 
commitMetadata.getFullPathsByPartitionPath(metaClient.getBasePath(), 
partitionPath);
+    return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+  }
+
+  @NotNull
+  private static SerializablePathFilter 
getSerializablePathFilter(HoodieTableMetaClient metaClient, String commit) {
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    return (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's 
okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+  }
+
+  private static List<ListingRollbackActionType> 
getListingRollbackType(HoodieInstant instantToRollback,
+                                                                        
HoodieTable<?, ?, ?, ?> table, String partitionPath) throws IOException {
+    List<ListingRollbackActionType> listingRollbackActionTypes = new 
ArrayList<>(2);
+    if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) 
{
+      
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+    } else {
+      String commit = instantToRollback.getTimestamp();
+      HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
+      switch (instantToRollback.getAction()) {
+        case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:
+          
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+          break;
+        case HoodieTimeline.COMPACTION_ACTION:
+          // If there is no delta commit present after the current commit (if 
compaction), no action, else we
+          // need to make sure that a compaction commit rollback also deletes 
any log files written as part of the
+          // succeeding deltacommit.
+          boolean higherDeltaCommits =
+              
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
 1).empty();
+          if (higherDeltaCommits) {
+            // Rollback of a compaction action with no higher deltacommit 
means that the compaction is scheduled
+            // and has not yet finished. In this scenario we should delete 
only the newly created base files
+            // and not corresponding base commit log files created with this 
as baseCommit since updates would
+            // have been written to the log files.
+            
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_FILES_ONLY);
+          } else {
+            // No deltacommits present after this compaction commit (inflight 
or requested). In this case, we
+            // can also delete any log files that were created with this 
compaction commit as base
+            // commit.
+            
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+          }
+          break;
+        case HoodieTimeline.DELTA_COMMIT_ACTION:
+          // 
--------------------------------------------------------------------------------------------------
+          // (A) The following cases are possible if index.canIndexLogFiles 
and/or index.isGlobal

Review comment:
       Note to self: we need to review and update this comments as they seem to 
be out of sync with the code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to