guanziyue commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1307822743


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -75,64 +82,115 @@ public List<HoodieRollbackRequest> 
getRollbackRequests(HoodieInstant instantToRo
       List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
           table, context, instantToRollback.getTimestamp(), 
config.getRollbackParallelism());
       int parallelism = Math.max(Math.min(markerPaths.size(), 
config.getRollbackParallelism()), 1);
-      return context.map(markerPaths, markerFilePath -> {
+      List<HoodieRollbackRequest> rollbackRequests = context.map(markerPaths, 
markerFilePath -> {
         String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
         IOType type = IOType.valueOf(typeStr);
+        String fileNameWithPartitionToRollback = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
+        Path fullFilePathToRollback = new Path(basePath, 
fileNameWithPartitionToRollback);
+        String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePathToRollback.getParent());
         switch (type) {
           case MERGE:
           case CREATE:
-            String fileToDelete = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
-            Path fullDeletePath = new Path(basePath, fileToDelete);
-            String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
-            return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, 
EMPTY_STRING,
-                Collections.singletonList(fullDeletePath.toString()),
+            String fileId = null;
+            String baseInstantTime = null;
+            if (FSUtils.isBaseFile(fullFilePathToRollback)) {
+              HoodieBaseFile baseFileToDelete = new 
HoodieBaseFile(fullFilePathToRollback.toString());
+              fileId = baseFileToDelete.getFileId();
+              baseInstantTime = baseFileToDelete.getCommitTime();
+            } else if (FSUtils.isLogFile(fullFilePathToRollback)) {
+              // TODO: HUDI-1517 may distinguish log file created from log 
file being appended in the future @guanziyue
+              // Now it should not have create type
+              checkArgument(type != IOType.CREATE, "Log file should not 
support create io type now");
+              checkArgument(type != IOType.MERGE, "Log file should not support 
merge io type");
+              HoodieLogFile logFileToDelete = new 
HoodieLogFile(fullFilePathToRollback.toString());
+              fileId = logFileToDelete.getFileId();
+              baseInstantTime = logFileToDelete.getBaseCommitTime();
+            }
+            Objects.requireNonNull(fileId, "Cannot find valid fileId from 
path: " + fullFilePathToRollback);
+            Objects.requireNonNull(baseInstantTime, "Cannot find valid base 
instant from path: " + fullFilePathToRollback);
+            return new HoodieRollbackRequest(partitionPath, fileId, 
baseInstantTime,
+                Collections.singletonList(fullFilePathToRollback.toString()),
                 Collections.emptyMap());
           case APPEND:
-            // NOTE: This marker file-path does NOT correspond to a log-file, 
but rather is a phony
-            //       path serving as a "container" for the following 
components:
-            //          - Base file's file-id
-            //          - Base file's commit instant
-            //          - Partition path
-            return getRollbackRequestForAppend(instantToRollback, 
WriteMarkers.stripMarkerSuffix(markerFilePath));
+            HoodieRollbackRequest rollbackRequestForAppend = 
getRollbackRequestForAppend(instantToRollback, fileNameWithPartitionToRollback);
+            return rollbackRequestForAppend;
           default:
             throw new HoodieRollbackException("Unknown marker type, during 
rollback of " + instantToRollback);
         }
       }, parallelism);
+      // we need to ensure we return one rollback request per partition path, 
fileId, baseInstant triplet.
+      List<HoodieRollbackRequest> processedRollbackRequests = 
context.parallelize(rollbackRequests)
+          .mapToPair(rollbackRequest -> 
Pair.of(Pair.of(rollbackRequest.getPartitionPath(), 
rollbackRequest.getFileId()), rollbackRequest))
+          .reduceByKey((SerializableBiFunction<HoodieRollbackRequest, 
HoodieRollbackRequest, HoodieRollbackRequest>) (hoodieRollbackRequest, 
hoodieRollbackRequest2)
+              -> RollbackUtils.mergeRollbackRequest(hoodieRollbackRequest, 
hoodieRollbackRequest2), parallelism).values().collectAsList();
+
+      return processedRollbackRequests;
     } catch (Exception e) {
       throw new HoodieRollbackException("Error rolling back using marker files 
written for " + instantToRollback, e);
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant 
instantToRollback, String markerFilePath) throws IOException {
-    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
-    String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
-    String baseCommitTime = 
FSUtils.getCommitTime(baseFilePathForAppend.getName());
-    String relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), baseFilePathForAppend.getParent());
-    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), 
relativePartitionPath);
-
-    // NOTE: Since we're rolling back incomplete Delta Commit, it only could 
have appended its
-    //       block to the latest log-file
-    // TODO(HUDI-1517) use provided marker-file's path instead
-    Option<HoodieLogFile> latestLogFileOption = 
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
-        HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
-
-    // Log file can be deleted if the commit to rollback is also the commit 
that created the fileGroup
-    if (latestLogFileOption.isPresent() && 
baseCommitTime.equals(instantToRollback.getTimestamp())) {
-      Path fullDeletePath = new Path(partitionPath, 
latestLogFileOption.get().getFileName());
-      return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, 
EMPTY_STRING,
-          Collections.singletonList(fullDeletePath.toString()),
-          Collections.emptyMap());
-    }
-    
-    Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
-    if (latestLogFileOption.isPresent()) {
-      HoodieLogFile latestLogFile = latestLogFileOption.get();
+  protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant 
instantToRollback, String fileNameWithPartitionToRollback) throws IOException {
+    Path filePath = new Path(basePath, fileNameWithPartitionToRollback);
+    String fileId;
+    String baseCommitTime;
+    String relativePartitionPath;
+    Option<HoodieLogFile> latestLogFileOption;
+
+    // Old marker files may be generated from base file name before HUDI-1517. 
keep compatible with them.
+    // TODO: deprecated in HUDI-1517, may be removed in the future. 
@guanziyue.gzy
+
+    Map<String, Long> logFilesWithBlocksToRollback = new HashMap<>();
+    if (FSUtils.isBaseFile(filePath)) {
+      LOG.warn("Find old marker type for log file: " + 
fileNameWithPartitionToRollback);
+      fileId = FSUtils.getFileIdFromFilePath(filePath);
+      baseCommitTime = FSUtils.getCommitTime(filePath.getName());
+      relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), filePath.getParent());
+      Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), 
relativePartitionPath);
+
+      // NOTE: Since we're rolling back incomplete Delta Commit, it only could 
have appended its
+      //       block to the latest log-file
+      try {
+        latestLogFileOption = 
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
+            HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
+        if (latestLogFileOption.isPresent() && 
baseCommitTime.equals(instantToRollback.getTimestamp())) {
+          HoodieLogFile latestLogFile = latestLogFileOption.get();
+          // NOTE: Marker's don't carry information about the cumulative size 
of the blocks that have been appended,
+          //       therefore we simply stub this value.
+          FileSystem fileSystem = table.getMetaClient().getFs();
+          List<Option<FileStatus>> fileStatuses = 
FSUtils.getFileStatusesUnderPartition(fileSystem, filePath.getParent(), 
Collections.singletonList(filePath.getName()), true);

Review Comment:
   Is it better to use getFileStatus here as we only have one file? 
FSUtils.getFileStatusesUnderPartition this method is improve the performance 
when we are trying to get more than one file under a directory.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,27 @@ protected static Option<IndexedRecord> 
toAvroRecord(HoodieRecord record, Schema
       return Option.empty();
     }
   }
+
+  protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback 
{
+    // we will create marker file for each log file and include all log files 
to commit message
+    // so that all log files generated in this job will be included in commit 
metadata. This is important for MDT.
+    // Assuming if spark task failed, the log file generated by it will not 
appear in WriteStatus
+
+    @Override
+    public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+      return createAppendMarker(logFileToAppend);
+    }
+
+    @Override
+    public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+      // TODO: HUDI-1517 may distinguish log file created from log file being 
appended in the future @guanziyue
+      return createAppendMarker(logFileToCreate);
+    }
+
+    private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+      WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+      return writeMarkers.create(partitionPath, logFileToAppend.getFileName(), 
IOType.APPEND,

Review Comment:
   Fix this



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -79,14 +93,28 @@ public List<HoodieRollbackStat> 
performRollback(HoodieEngineContext context, Hoo
     // stack trace: 
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
     // related stack overflow post: 
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as 
GenericData.Array.
     List<SerializableHoodieRollbackRequest> serializableRequests = 
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
-    return context.reduceByKey(maybeDeleteAndCollectStats(context, 
instantToRollback, serializableRequests, true, parallelism),
-        RollbackUtils::mergeRollbackStat, parallelism);
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+
+    // Considering rollback may failed before, which generated some additional 
log files. We need to add these log files back.
+    Set<String> logPaths = new HashSet<>();
+    try {
+      logPaths = markers.getAppendedLogPaths(context, 
config.getFinalizeWriteParallelism());

Review Comment:
   hi codope, we do need a list op here to find all log files we may generated 
before. This will return in a quick time when we don't have rollback failed 
before.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -59,18 +71,20 @@ public class BaseRollbackHelper implements Serializable {
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseRollbackHelper.class);
   protected static final String EMPTY_STRING = "";
 
+  protected final HoodieTable table;
   protected final HoodieTableMetaClient metaClient;
   protected final HoodieWriteConfig config;
 
-  public BaseRollbackHelper(HoodieTableMetaClient metaClient, 
HoodieWriteConfig config) {
-    this.metaClient = metaClient;
+  public BaseRollbackHelper(HoodieTable table, HoodieWriteConfig config) {
+    this.table = table;
+    this.metaClient = table.getMetaClient();
     this.config = config;
   }
 
   /**
    * Performs all rollback actions that we have collected in parallel.
    */
-  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, 
HoodieInstant instantToRollback,
+  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, 
String instantTime, HoodieInstant instantToRollback,

Review Comment:
   Are we going to generate markers in rollback instantTime rather than delta 
commit instant time? Just curious about the reason. Is it because we can have a 
better performance here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to