nsivabalan commented on a change in pull request #3651:
URL: https://github.com/apache/hudi/pull/3651#discussion_r709251029



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -65,83 +63,32 @@ public AbstractMarkerBasedRollbackStrategy(HoodieTable<T, 
I, K, O> table, Hoodie
     this.instantTime = instantTime;
   }
 
-  protected HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws 
IOException {
-    LOG.info("Rolling back by deleting the merged base file:" + 
mergedBaseFilePath);
-    return deleteBaseFile(mergedBaseFilePath);
-  }
-
-  protected HoodieRollbackStat undoCreate(String createdBaseFilePath) throws 
IOException {
-    LOG.info("Rolling back by deleting the created base file:" + 
createdBaseFilePath);
-    return deleteBaseFile(createdBaseFilePath);
-  }
-
-  private HoodieRollbackStat deleteBaseFile(String baseFilePath) throws 
IOException {
-    Path fullDeletePath = new Path(basePath, baseFilePath);
-    String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
-    boolean isDeleted = table.getMetaClient().getFs().delete(fullDeletePath);
-    return HoodieRollbackStat.newBuilder()
-        .withPartitionPath(partitionPath)
-        .withDeletedFileResult(baseFilePath, isDeleted)
-        .build();
-  }
-
-  protected HoodieRollbackStat undoAppend(String appendBaseFilePath, 
HoodieInstant instantToRollback) throws IOException, InterruptedException {
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String 
appendBaseFilePath) throws IOException {
     Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = 
FSUtils.getCommitTime(baseFilePathForAppend.getName());
     String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
-    final Map<FileStatus, Long> writtenLogFileSizeMap = 
getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-
-    HoodieLogFormat.Writer writer = null;
-    try {
-      Path partitionFullPath = FSUtils.getPartitionPath(basePath, 
partitionPath);
-
-      if (!table.getMetaClient().getFs().exists(partitionFullPath)) {
-        return HoodieRollbackStat.newBuilder()
-            .withPartitionPath(partitionPath)
-            .build();
-      }
-      writer = HoodieLogFormat.newWriterBuilder()
-          .onParentPath(partitionFullPath)
-          .withFileId(fileId)
-          .overBaseCommit(baseCommitTime)
-          .withFs(table.getMetaClient().getFs())
-          .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-      // generate metadata
-      Map<HoodieLogBlock.HeaderMetadataType, String> header = 
RollbackUtils.generateHeader(instantToRollback.getTimestamp(), instantTime);
-      // if update belongs to an existing log file
-      writer.appendBlock(new HoodieCommandBlock(header));
-    } finally {
-      try {
-        if (writer != null) {
-          writer.close();
-        }
-      } catch (IOException io) {
-        throw new HoodieIOException("Error closing append of rollback 
block..", io);
-      }
+    Map<FileStatus, Long> writtenLogFileSizeMap = 
getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
+    Map<String, Long> writtenLogFileStrSizeMap = new HashMap<>();
+    for (Map.Entry<FileStatus, Long> entry : writtenLogFileSizeMap.entrySet()) 
{
+      writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), 
entry.getValue());
     }
-
-    // the information of files appended to is required for metadata sync
-    Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
-          
table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-          1L);
-
-    return HoodieRollbackStat.newBuilder()
-        .withPartitionPath(partitionPath)
-        .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-        .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build();
+    return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, 
Collections.emptyList(), writtenLogFileStrSizeMap);
   }
 
   /**
    * Returns written log file size map for the respective baseCommitTime to 
assist in metadata table syncing.
-   * @param partitionPath partition path of interest
+   *
+   * @param partitionPath  partition path of interest
    * @param baseCommitTime base commit time of interest
-   * @param fileId fileId of interest
+   * @param fileId         fileId of interest
    * @return Map<FileStatus, File size>
    * @throws IOException
    */
   protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String 
partitionPath, String baseCommitTime, String fileId) throws IOException {
-    return Collections.EMPTY_MAP;
+    // collect all log files that is supposed to be deleted with this rollback
+    return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),

Review comment:
       I don't fully get this point. I have created a follow up ticket 
[here](https://issues.apache.org/jira/browse/HUDI-2436). will follow up with 
you. 
   here is my understanding: of a scenario using cloud stores that does not 
support append. 
   
   If there was crash during a commit, when listing log files to be logged, the 
last one which got crashed may not be part of the rollback plan. but thats 
should be fine. anyways, its not available via listing. and so I assume even 
during compaction those will not be available. we will proceed on with rollback 
by adding another log block (file). and this will get replayed to metadata 
table. 
   
   If you are talking about the case, where a crash happens when rollback 
itself is being logged and crashed just before committing to metadata table. 
       we should be ok here too. we will retry the rollback which will redo the 
action phase. and will add new log blocks (with same old logs that were part of 
failed writes, just that it may not be able to successfully delete). and this 
will get applied to metadata table. We just have to ensure when applying 
changes to metadata table, we consider all files from the plan and not just the 
ones that got successfully deleted. 
   
    
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to