nsivabalan commented on a change in pull request #4913:
URL: https://github.com/apache/hudi/pull/4913#discussion_r824879591



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -73,54 +80,110 @@ public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> 
table, HoodieEngineCo
       List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
           table, context, instantToRollback.getTimestamp(), 
config.getRollbackParallelism());
       int parallelism = Math.max(Math.min(markerPaths.size(), 
config.getRollbackParallelism()), 1);
-      return context.map(markerPaths, markerFilePath -> {
-        String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
-        IOType type = IOType.valueOf(typeStr);
-        switch (type) {
-          case MERGE:
-          case CREATE:
-            String fileToDelete = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
-            Path fullDeletePath = new Path(basePath, fileToDelete);
-            String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
-            return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, 
EMPTY_STRING,
-                Collections.singletonList(fullDeletePath.toString()),
-                Collections.emptyMap());
-          case APPEND:
-            // NOTE: This marker file-path does NOT correspond to a log-file, 
but rather is a phony
-            //       path serving as a "container" for the following 
components:
-            //          - Base file's file-id
-            //          - Base file's commit instant
-            //          - Partition path
-            return 
getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
-          default:
-            throw new HoodieRollbackException("Unknown marker type, during 
rollback of " + instantToRollback);
-        }
-      }, parallelism);
+      return context.mapToPairAndReduceByKey(markerPaths,
+          // generate rollback request per marker file
+          getRollbackReqGenerateFunction(instantToRollback),
+          // NOTE: Since we're rolling back incomplete Delta Commit, it only 
could have appended its
+          //       block to the latest log-file. But we cannot simply get the 
latest log-file by one marker file.
+          //       So compare log-files in the same fileGroup and get the 
latest one.
+          getRollbackReqCombineFunction(), parallelism);
     } catch (Exception e) {
       throw new HoodieRollbackException("Error rolling back using marker files 
written for " + instantToRollback, e);
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(String 
markerFilePath) throws IOException {
-    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
-    String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
-    String baseCommitTime = 
FSUtils.getCommitTime(baseFilePathForAppend.getName());
-    String relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), baseFilePathForAppend.getParent());
-    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), 
relativePartitionPath);
-
-    // NOTE: Since we're rolling back incomplete Delta Commit, it only could 
have appended its
-    //       block to the latest log-file
-    // TODO(HUDI-1517) use provided marker-file's path instead
-    HoodieLogFile latestLogFile = 
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
-        HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).get();
-
-    // NOTE: Marker's don't carry information about the cumulative size of the 
blocks that have been appended,
-    //       therefore we simply stub this value.
-    Map<String, Long> logFilesWithBlocsToRollback =
-        
Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), 
-1L);
+  private SerializablePairFunction<String, Pair<String, String>, 
HoodieRollbackRequest> getRollbackReqGenerateFunction(
+      HoodieInstant instantToRollback) {
+    return markerFilePath -> {
+      String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
+      IOType type = IOType.valueOf(typeStr);
+      String partitionFilePath = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
+      Path fullFilePath = new Path(basePath, partitionFilePath);
+      String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePath.getParent());
+      switch (type) {
+        case MERGE:
+        case CREATE:
+          HoodieBaseFile baseFileToDelete = new 
HoodieBaseFile(fullFilePath.toString());
+          String fileId = baseFileToDelete.getFileId();
+          String baseInstantTime = baseFileToDelete.getCommitTime();
+          return Pair.of(Pair.of(partitionPath, fileId),
+              new HoodieRollbackRequest(partitionPath, fileId, baseInstantTime,
+                  Collections.singletonList(fullFilePath.toString()),
+                  Collections.emptyMap()));
+        case APPEND:
+          HoodieRollbackRequest rollbackRequestForAppend = 
getRollbackRequestForAppend(partitionFilePath);
+          return Pair.of(Pair.of(partitionPath, 
rollbackRequestForAppend.getFileId()),
+              rollbackRequestForAppend);
+        default:
+          throw new HoodieRollbackException("Unknown marker type, during 
rollback of " + instantToRollback);
+      }
+    };
+  }
+
+  private SerializableBiFunction<HoodieRollbackRequest, HoodieRollbackRequest, 
HoodieRollbackRequest> getRollbackReqCombineFunction() {
+    return (rollbackReq1, rollbackReq2) -> {
+      List<String> filesToBeDeleted = new LinkedList<>();
+      filesToBeDeleted.addAll(rollbackReq1.getFilesToBeDeleted());
+      filesToBeDeleted.addAll(rollbackReq2.getFilesToBeDeleted());
+      final Comparator<HoodieLogFile> logFileComparator = 
HoodieLogFile.getLogFileComparator();
+      HoodieLogFile latestLogFile = null;
+      long latestLogFileLen = -1;
+
+      for (Map.Entry<String, Long> pathLengthPair : 
rollbackReq1.getLogBlocksToBeDeleted().entrySet()) {
+        HoodieLogFile candidateLogFile = new 
HoodieLogFile(pathLengthPair.getKey());
+        if (latestLogFile == null || logFileComparator.compare(latestLogFile, 
candidateLogFile) >= 0) {
+          latestLogFile = candidateLogFile;
+          latestLogFileLen = pathLengthPair.getValue();
+        }
+      }
 
+      for (Map.Entry<String, Long> pathLengthPair : 
rollbackReq2.getLogBlocksToBeDeleted().entrySet()) {
+        HoodieLogFile candidateLogFile = new 
HoodieLogFile(pathLengthPair.getKey());
+        if (latestLogFile == null || logFileComparator.compare(latestLogFile, 
candidateLogFile) >= 0) {
+          latestLogFile = candidateLogFile;
+          latestLogFileLen = pathLengthPair.getValue();
+        }
+      }
+      return new HoodieRollbackRequest(rollbackReq1.getPartitionPath(), 
rollbackReq1.getFileId(),
+          rollbackReq1.getLatestBaseInstant(), filesToBeDeleted,
+          latestLogFile == null ? Collections.emptyMap() :
+              Collections.singletonMap(latestLogFile.getPath().toString(), 
latestLogFileLen));
+    };
+  }
+
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String 
markerFilePath) {
+    Path filePath = new Path(basePath, markerFilePath);

Review comment:
       correct me if my understanding is wrong. thought this will get 
simplified a lot when we start generating maker file per log file. 
   Just from the marker file path name, can't we deduce the log file that needs 
to be rolled back?  
   prior to this patch, we don't create maker file w/ log file info, and hence 
we had to deduce the latest log file. but now, with this patch, wouldn't that 
get simplified. 
   can you help me understand. 
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -73,54 +80,110 @@ public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> 
table, HoodieEngineCo
       List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
           table, context, instantToRollback.getTimestamp(), 
config.getRollbackParallelism());
       int parallelism = Math.max(Math.min(markerPaths.size(), 
config.getRollbackParallelism()), 1);
-      return context.map(markerPaths, markerFilePath -> {
-        String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
-        IOType type = IOType.valueOf(typeStr);
-        switch (type) {
-          case MERGE:
-          case CREATE:
-            String fileToDelete = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
-            Path fullDeletePath = new Path(basePath, fileToDelete);
-            String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
-            return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, 
EMPTY_STRING,
-                Collections.singletonList(fullDeletePath.toString()),
-                Collections.emptyMap());
-          case APPEND:
-            // NOTE: This marker file-path does NOT correspond to a log-file, 
but rather is a phony
-            //       path serving as a "container" for the following 
components:
-            //          - Base file's file-id
-            //          - Base file's commit instant
-            //          - Partition path
-            return 
getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
-          default:
-            throw new HoodieRollbackException("Unknown marker type, during 
rollback of " + instantToRollback);
-        }
-      }, parallelism);
+      return context.mapToPairAndReduceByKey(markerPaths,
+          // generate rollback request per marker file
+          getRollbackReqGenerateFunction(instantToRollback),
+          // NOTE: Since we're rolling back incomplete Delta Commit, it only 
could have appended its
+          //       block to the latest log-file. But we cannot simply get the 
latest log-file by one marker file.
+          //       So compare log-files in the same fileGroup and get the 
latest one.
+          getRollbackReqCombineFunction(), parallelism);
     } catch (Exception e) {
       throw new HoodieRollbackException("Error rolling back using marker files 
written for " + instantToRollback, e);
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(String 
markerFilePath) throws IOException {
-    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
-    String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
-    String baseCommitTime = 
FSUtils.getCommitTime(baseFilePathForAppend.getName());
-    String relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), baseFilePathForAppend.getParent());
-    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), 
relativePartitionPath);
-
-    // NOTE: Since we're rolling back incomplete Delta Commit, it only could 
have appended its
-    //       block to the latest log-file
-    // TODO(HUDI-1517) use provided marker-file's path instead
-    HoodieLogFile latestLogFile = 
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
-        HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).get();
-
-    // NOTE: Marker's don't carry information about the cumulative size of the 
blocks that have been appended,
-    //       therefore we simply stub this value.
-    Map<String, Long> logFilesWithBlocsToRollback =
-        
Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), 
-1L);
+  private SerializablePairFunction<String, Pair<String, String>, 
HoodieRollbackRequest> getRollbackReqGenerateFunction(
+      HoodieInstant instantToRollback) {
+    return markerFilePath -> {
+      String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
+      IOType type = IOType.valueOf(typeStr);
+      String partitionFilePath = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
+      Path fullFilePath = new Path(basePath, partitionFilePath);
+      String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePath.getParent());
+      switch (type) {
+        case MERGE:
+        case CREATE:
+          HoodieBaseFile baseFileToDelete = new 
HoodieBaseFile(fullFilePath.toString());
+          String fileId = baseFileToDelete.getFileId();
+          String baseInstantTime = baseFileToDelete.getCommitTime();
+          return Pair.of(Pair.of(partitionPath, fileId),
+              new HoodieRollbackRequest(partitionPath, fileId, baseInstantTime,
+                  Collections.singletonList(fullFilePath.toString()),
+                  Collections.emptyMap()));
+        case APPEND:
+          HoodieRollbackRequest rollbackRequestForAppend = 
getRollbackRequestForAppend(partitionFilePath);
+          return Pair.of(Pair.of(partitionPath, 
rollbackRequestForAppend.getFileId()),
+              rollbackRequestForAppend);
+        default:
+          throw new HoodieRollbackException("Unknown marker type, during 
rollback of " + instantToRollback);
+      }
+    };
+  }
+
+  private SerializableBiFunction<HoodieRollbackRequest, HoodieRollbackRequest, 
HoodieRollbackRequest> getRollbackReqCombineFunction() {
+    return (rollbackReq1, rollbackReq2) -> {
+      List<String> filesToBeDeleted = new LinkedList<>();
+      filesToBeDeleted.addAll(rollbackReq1.getFilesToBeDeleted());
+      filesToBeDeleted.addAll(rollbackReq2.getFilesToBeDeleted());
+      final Comparator<HoodieLogFile> logFileComparator = 
HoodieLogFile.getLogFileComparator();
+      HoodieLogFile latestLogFile = null;
+      long latestLogFileLen = -1;
+
+      for (Map.Entry<String, Long> pathLengthPair : 
rollbackReq1.getLogBlocksToBeDeleted().entrySet()) {
+        HoodieLogFile candidateLogFile = new 
HoodieLogFile(pathLengthPair.getKey());
+        if (latestLogFile == null || logFileComparator.compare(latestLogFile, 
candidateLogFile) >= 0) {
+          latestLogFile = candidateLogFile;
+          latestLogFileLen = pathLengthPair.getValue();
+        }
+      }
 
+      for (Map.Entry<String, Long> pathLengthPair : 
rollbackReq2.getLogBlocksToBeDeleted().entrySet()) {
+        HoodieLogFile candidateLogFile = new 
HoodieLogFile(pathLengthPair.getKey());
+        if (latestLogFile == null || logFileComparator.compare(latestLogFile, 
candidateLogFile) >= 0) {
+          latestLogFile = candidateLogFile;
+          latestLogFileLen = pathLengthPair.getValue();
+        }
+      }
+      return new HoodieRollbackRequest(rollbackReq1.getPartitionPath(), 
rollbackReq1.getFileId(),
+          rollbackReq1.getLatestBaseInstant(), filesToBeDeleted,
+          latestLogFile == null ? Collections.emptyMap() :
+              Collections.singletonMap(latestLogFile.getPath().toString(), 
latestLogFileLen));
+    };
+  }
+
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String 
markerFilePath) {
+    Path filePath = new Path(basePath, markerFilePath);

Review comment:
       or am I missing something 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to