nsivabalan commented on a change in pull request #3651:
URL: https://github.com/apache/hudi/pull/3651#discussion_r709190362



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMarkerBasedRollbackStrategy.java
##########
@@ -18,59 +18,60 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
 
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
-@SuppressWarnings("checkstyle:LineLength")
-public class JavaMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> 
extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
-  public JavaMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
-                                         HoodieEngineContext context,
-                                         HoodieWriteConfig config,
-                                         String instantTime) {
+import static 
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
+public class BaseMarkerBasedRollbackStrategy<T extends HoodieRecordPayload, I, 
K, O> extends AbstractMarkerBasedRollbackStrategy {
+
+  public BaseMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, 
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
     super(table, context, config, instantTime);
   }
 
   @Override
-  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
+  public List<HoodieRollbackRequest> getRollbackRequest(HoodieInstant 
instantToRollback) {
     try {
       List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
           table, context, instantToRollback.getTimestamp(), 
config.getRollbackParallelism());
-      List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, 
markerFilePath -> {
+      List<HoodieRollbackRequest> rollbackRequests = new ArrayList<>();
+      int parallelism = Math.max(Math.min(markerPaths.size(), 
config.getRollbackParallelism()), 1);
+      context.foreach(markerPaths, markerFilePath -> {
         String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
         IOType type = IOType.valueOf(typeStr);
         switch (type) {
           case MERGE:
-            return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
-          case APPEND:
-            return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), 
instantToRollback);
           case CREATE:
-            return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
+            String fileToDelete = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
+            Path fullDeletePath = new Path(basePath, fileToDelete);
+            String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());

Review comment:
       https://issues.apache.org/jira/browse/HUDI-2437




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to