vinothchandar commented on a change in pull request #1858:
URL: https://github.com/apache/hudi/pull/1858#discussion_r461265990



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
##########
@@ -100,8 +108,13 @@ public CopyOnWriteRollbackActionExecutor(JavaSparkContext 
jsc,
   }
 
   @Override
-  protected List<HoodieRollbackStat> 
executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
+  protected List<HoodieRollbackStat> 
executeRollbackUsingFileListing(HoodieInstant instantToRollback, boolean 
doDelete) {
     List<ListingBasedRollbackRequest> rollbackRequests = 
generateRollbackRequestsByListing();
-    return new ListingBasedRollbackHelper(table.getMetaClient(), 
config).performRollback(jsc, instantToRollback, rollbackRequests);
+    ListingBasedRollbackHelper listingBasedRollbackHelper = new 
ListingBasedRollbackHelper(table.getMetaClient(), config);
+    if(doDelete) {
+      return listingBasedRollbackHelper.performRollback(jsc, 
instantToRollback, rollbackRequests);

Review comment:
       as discussed, we can just call the `collectRollbackStats` directly, 
assuming listing based rollback strategy.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -68,34 +69,38 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
    * Performs all rollback actions that we have collected in parallel.
    */
   public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, 
HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> 
rollbackRequests) {
-    SerializablePathFilter filter = (path) -> {

Review comment:
       ack

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -130,39 +137,55 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
               1L
           );
           return new Tuple2<>(rollbackRequest.getPartitionPath(),
-                  
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                          
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+              
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
         }
         default:
           throw new IllegalStateException("Unknown Rollback action " + 
rollbackRequest);
       }
-    }).reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
+    });
   }
 
 
-
   /**
    * Common method used for cleaning out base files under a partition path 
during rollback of a set of commits.
    */
-  private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
-                                                      String partitionPath, 
PathFilter filter) throws IOException {
+  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,

Review comment:
       its hard. but would nt MERGE work for both in terms of actually 
performing a correct rollback? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -130,39 +137,55 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
               1L
           );
           return new Tuple2<>(rollbackRequest.getPartitionPath(),
-                  
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                          
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+              
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
         }
         default:
           throw new IllegalStateException("Unknown Rollback action " + 
rollbackRequest);
       }
-    }).reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
+    });
   }
 
 
-
   /**
    * Common method used for cleaning out base files under a partition path 
during rollback of a set of commits.
    */
-  private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
-                                                      String partitionPath, 
PathFilter filter) throws IOException {
+  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+      String commit, String partitionPath, boolean doDelete) throws 
IOException {
     LOG.info("Cleaning path " + partitionPath);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    SerializablePathFilter filter = (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's 
okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+
     final Map<FileStatus, Boolean> results = new HashMap<>();
     FileSystem fs = metaClient.getFs();
     FileStatus[] toBeDeleted = 
fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), 
filter);
     for (FileStatus file : toBeDeleted) {
-      boolean success = fs.delete(file.getPath(), false);
-      results.put(file, success);
-      LOG.info("Delete file " + file.getPath() + "\t" + success);
+      if(doDelete) {
+        boolean success = fs.delete(file.getPath(), false);
+        results.put(file, success);
+        LOG.info("Delete file " + file.getPath() + "\t" + success);
+      } else{
+        results.put(file, true);

Review comment:
       lets add a test , that has inflight commit, with few marker files 
deleted. and then we show that the code can correctly upgrade and perform a 
rollback using marker based strategy 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to