vinothchandar commented on a change in pull request #2421:
URL: https://github.com/apache/hudi/pull/2421#discussion_r554648625



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -90,6 +90,8 @@ protected HoodieRollbackStat undoAppend(String 
appendBaseFilePath, HoodieInstant
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = 
FSUtils.getCommitTime(baseFilePathForAppend.getName());
     String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
+    final Map<FileStatus, Long> writtenLogFileSizeMap = 
config.useFileListingMetadata()

Review comment:
       Hmmm. can we remove this flag as we discussed before? using the file 
listing flag here is very confusing and crosses a lot of layers. if you are 
worried about the new code breaking, then a catch block returning an empty map 
is good? Downside is it will make it hard to detect issues here. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,6 +119,15 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
                   .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case APPEND_ROLLBACK_BLOCK: {
+          // collect all log files that is supposed to be deleted with this 
rollback
+          Map<FileStatus, Long> writtenLogFileSizeMap = Collections.EMPTY_MAP;
+          if (config.useFileListingMetadata()) {

Review comment:
       same here. lets not reference this config here.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,6 +119,15 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
                   .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case APPEND_ROLLBACK_BLOCK: {
+          // collect all log files that is supposed to be deleted with this 
rollback
+          Map<FileStatus, Long> writtenLogFileSizeMap = Collections.EMPTY_MAP;
+          if (config.useFileListingMetadata()) {
+            Path partitionPath = 
FSUtils.getPartitionPath(config.getBasePath(), 
rollbackRequest.getPartitionPath());
+            writtenLogFileSizeMap = metaClient.getFs().exists(partitionPath) ? 
FSUtils.getAllLogFiles(metaClient.getFs(), partitionPath,

Review comment:
       what does `FSUtils.getAllLogFiles` return if the partition does not 
exist? can we avoid the .exists() call

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -239,7 +240,8 @@ private static void 
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
 
     rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
       // Has this rollback produced new files?
-      boolean hasAppendFiles = 
pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
+      boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && 
!pm.getRollbackLogFiles().isEmpty();
+      boolean hasAppendFiles = hasRollbackLogFiles ? 
pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0 
: false;

Review comment:
       should we rename `hasAppendFiles` -> `hasNonZeroRollbackLogFiles`?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
##########
@@ -75,4 +86,12 @@ public SparkMarkerBasedRollbackStrategy(HoodieTable<T, 
JavaRDD<HoodieRecord<T>>,
       throw new HoodieRollbackException("Error rolling back using marker files 
written for " + instantToRollback, e);
     }
   }
+
+  protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String 
partitionPathStr, String baseCommitTime, String fileId) throws IOException {
+    // collect all log files that is supposed to be deleted with this rollback
+    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), 
partitionPathStr);
+    return table.getMetaClient().getFs().exists(partitionPath) ? 
FSUtils.getAllLogFiles(table.getMetaClient().getFs(),

Review comment:
       same here.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
##########
@@ -32,16 +35,24 @@
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
+import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
 @SuppressWarnings("checkstyle:LineLength")
 public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> 
extends AbstractMarkerBasedRollbackStrategy<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
-  public SparkMarkerBasedRollbackStrategy(HoodieTable<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, 
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
+
+  public SparkMarkerBasedRollbackStrategy(HoodieTable<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, 
HoodieEngineContext context, HoodieWriteConfig config,

Review comment:
       restore formatting

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,18 +264,31 @@ private static void 
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
         partitionToDeletedFiles.get(partition).addAll(deletedFiles);
       }
 
-      if (!pm.getAppendFiles().isEmpty()) {
+      if (pm.getRollbackLogFiles() != null && 
!pm.getRollbackLogFiles().isEmpty()) {

Review comment:
       is nt this condition same as `hasRollbackLogFiles` ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,18 +264,31 @@ private static void 
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
         partitionToDeletedFiles.get(partition).addAll(deletedFiles);
       }
 
-      if (!pm.getAppendFiles().isEmpty()) {
+      if (pm.getRollbackLogFiles() != null && 
!pm.getRollbackLogFiles().isEmpty()) {
         if (!partitionToAppendedFiles.containsKey(partition)) {
           partitionToAppendedFiles.put(partition, new HashMap<>());
         }
 
         // Extract appended file name from the absolute paths saved in 
getAppendFiles()
-        pm.getAppendFiles().forEach((path, size) -> {
+        pm.getRollbackLogFiles().forEach((path, size) -> {
           partitionToAppendedFiles.get(partition).merge(new 
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
             return size + oldSize;
           });
         });
       }
+
+      if (pm.getWrittenLogFiles() != null && 
!pm.getWrittenLogFiles().isEmpty()) {
+        if (!partitionToAppendedFiles.containsKey(partition)) {
+          partitionToAppendedFiles.put(partition, new HashMap<>());
+        }
+
+        // Extract appended file name from the absolute paths saved in 
getWrittenLogFiles()
+        pm.getWrittenLogFiles().forEach((path, size) -> {
+          partitionToAppendedFiles.get(partition).merge(new 
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+            return oldSize;

Review comment:
       I think we should fix it to take the higher value, uniformly. and avoid 
all these caveats. Code will be more maintainable

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,13 +251,26 @@ private static void 
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
         partitionToDeletedFiles.get(partition).addAll(deletedFiles);
       }
 
-      if (!pm.getAppendFiles().isEmpty()) {
+      if (pm.getRollbackLogFiles() != null && 
!pm.getRollbackLogFiles().isEmpty()) {
         if (!partitionToAppendedFiles.containsKey(partition)) {
           partitionToAppendedFiles.put(partition, new HashMap<>());
         }
 
         // Extract appended file name from the absolute paths saved in 
getAppendFiles()
-        pm.getAppendFiles().forEach((path, size) -> {
+        pm.getRollbackLogFiles().forEach((path, size) -> {
+          partitionToAppendedFiles.get(partition).merge(new 
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+            return size + oldSize;

Review comment:
       still, should we pick the higher value always?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -430,6 +430,17 @@ public static boolean isLogFile(Path logPath) {
         .map(HoodieLogFile::new).filter(s -> 
s.getBaseCommitTime().equals(baseCommitTime));
   }
 
+  /**
+   * Get all the log files for the passed in FileId in the partition path.

Review comment:
       fix the docs?

##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -31,18 +31,22 @@
             {"name": "partitionPath", "type": "string"},
             {"name": "successDeleteFiles", "type": {"type": "array", "items": 
"string"}},
             {"name": "failedDeleteFiles", "type": {"type": "array", "items": 
"string"}},
-            {"name": "appendFiles", "type": {
+            {"name": "rollbackLogFiles", "type": {
                 "type": "map",
-                "doc": "Files to which append blocks were written",

Review comment:
       why remove the docs? in fact, we should add to both?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
##########
@@ -40,88 +48,103 @@
 
 public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
 
+  private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with 
listing metadata enable={0}";
+
+  public static Stream<Arguments> configParams() {
+    return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
+  }
+
+  private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+
   @BeforeEach
   public void setUp() throws Exception {
     initPath();
     initSparkContexts();
     initFileSystem();
-    initMetaClient();
-    initDFS();
+    initMetaClient(tableType);
+    initTestDataGenerator();
   }
 
   @AfterEach
   public void tearDown() throws Exception {
     cleanupResources();
   }
 
-  @Test
-  public void testCopyOnWriteRollback() throws Exception {
-    // given: wrote some base files and corresponding markers
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String f0 = testTable.addRequestedCommit("000")
-        .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
-    String f1 = testTable.addCommit("001")
-        .withBaseFilesInPartition("partA", f0)
-        .getFileIdsWithBaseFilesInPartitions("partB").get("partB");
-    String f2 = "f2";
-    testTable.forCommit("001")
-        .withMarkerFile("partA", f0, IOType.MERGE)
-        .withMarkerFile("partB", f1, IOType.CREATE)
-        .withMarkerFile("partA", f2, IOType.CREATE);
-
-    // when
-    List<HoodieRollbackStat> stats = new 
SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, 
metaClient), context, getConfig(), "002")
-        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "001"));
-
-    // then: ensure files are deleted correctly, non-existent files reported 
as failed deletes
-    assertEquals(2, stats.size());
-
-    FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
-    FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
-
-    assertEquals(0, partBFiles.length);
-    assertEquals(1, partAFiles.length);
-    assertEquals(2, stats.stream().mapToInt(r -> 
r.getSuccessDeleteFiles().size()).sum());
-    assertEquals(1, stats.stream().mapToInt(r -> 
r.getFailedDeleteFiles().size()).sum());
+  @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)

Review comment:
       yes. but instead of replacing. can we make yours functional test? and 
retain the existing test code?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,6 +119,15 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
                   .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case APPEND_ROLLBACK_BLOCK: {
+          // collect all log files that is supposed to be deleted with this 
rollback
+          Map<FileStatus, Long> writtenLogFileSizeMap = Collections.EMPTY_MAP;
+          if (config.useFileListingMetadata()) {
+            Path partitionPath = 
FSUtils.getPartitionPath(config.getBasePath(), 
rollbackRequest.getPartitionPath());
+            writtenLogFileSizeMap = metaClient.getFs().exists(partitionPath) ? 
FSUtils.getAllLogFiles(metaClient.getFs(), partitionPath,

Review comment:
       why can we get the logfiles only for that fileID here?  seems like you 
are fetching all log files in that partition?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -430,6 +430,17 @@ public static boolean isLogFile(Path logPath) {
         .map(HoodieLogFile::new).filter(s -> 
s.getBaseCommitTime().equals(baseCommitTime));
   }
 
+  /**
+   * Get all the log files for the passed in FileId in the partition path.

Review comment:
       I wonder why we need this method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to