This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9744de26709 [HUDI-6246] Add tests to validate restore of compaction 
commit (#9157)
9744de26709 is described below

commit 9744de26709a667f11aebae90f2d10a8fd93e0b5
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Jul 10 03:18:58 2023 -0400

    [HUDI-6246] Add tests to validate restore of compaction commit (#9157)
---
 .../TestMergeOnReadRollbackActionExecutor.java     | 78 ++++++++++++++++++++++
 1 file changed, 78 insertions(+)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index b3d10db6e52..848b51a6850 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -44,6 +44,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.testutils.Assertions;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
 import org.apache.spark.api.java.JavaRDD;
@@ -63,6 +64,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -160,6 +162,82 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
     assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, 
"002").doesMarkerDirExist());
   }
 
+  @Test
+  public void testMergeOnReadRestoreCompactionCommit() throws IOException {
+    boolean isUsingMarkers = false;
+    HoodieWriteConfig cfg = 
getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
+
+    // 1. ingest data to partition 3.
+    //just generate two partitions
+    HoodieTestDataGenerator dataGenPartition3 = new 
HoodieTestDataGenerator(new String[]{DEFAULT_THIRD_PARTITION_PATH});
+    HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new 
String[]{DEFAULT_THIRD_PARTITION_PATH}, basePath);
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    /**
+     * Write 1 (only inserts)
+     */
+    String newCommitTime = "0000001";
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> records = 
dataGenPartition3.generateInsertsContainsAllPartitions(newCommitTime, 2);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+    Assertions.assertNoWriteErrors(statuses.collect());
+    client.commit(newCommitTime, statuses);
+
+    //2. Ingest inserts + upserts to partition 1 and 2. we will eventually 
rollback both these commits using restore flow.
+    List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+    List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+    twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, 
secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+    List<HoodieLogFile> firstPartitionCommit2LogFiles = new ArrayList<>();
+    List<HoodieLogFile> secondPartitionCommit2LogFiles = new ArrayList<>();
+    
firstPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile
 -> firstPartitionCommit2LogFiles.add(logFile));
+    assertEquals(1, firstPartitionCommit2LogFiles.size());
+    
secondPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile
 -> secondPartitionCommit2LogFiles.add(logFile));
+    assertEquals(1, secondPartitionCommit2LogFiles.size());
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+
+    //3. rollback the update to partition1 and partition2
+    HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, 
HoodieTimeline.DELTA_COMMIT_ACTION, "002");
+    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, cfg, table, "003", 
rollBackInstant, false,
+            cfg.shouldRollbackUsingMarkers(), true);
+    mergeOnReadRollbackPlanActionExecutor.execute().get();
+    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new 
MergeOnReadRollbackActionExecutor(
+        context,
+        cfg,
+        table,
+        "003",
+        rollBackInstant,
+        true,
+        false);
+    //3. assert the rollback stat
+    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
+    assertEquals(2, rollbackMetadata.size());
+    assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, 
"002").doesMarkerDirExist());
+
+    // rollback 001 as well. this time since its part of the restore, entire 
file slice should be deleted and not just log files (for partition1 and 
partition2)
+    HoodieInstant rollBackInstant1 = new HoodieInstant(isUsingMarkers, 
HoodieTimeline.DELTA_COMMIT_ACTION, "001");
+    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor1 =
+        new BaseRollbackPlanActionExecutor(context, cfg, table, "004", 
rollBackInstant1, false,
+            cfg.shouldRollbackUsingMarkers(), true);
+    mergeOnReadRollbackPlanActionExecutor1.execute().get();
+    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor1 = new 
MergeOnReadRollbackActionExecutor(
+        context,
+        cfg,
+        table,
+        "004",
+        rollBackInstant1,
+        true,
+        false);
+    mergeOnReadRollbackActionExecutor1.execute().getPartitionMetadata();
+
+    //assert there are no valid file groups in both partition1 and partition2
+    assertEquals(0, 
table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).count());
+    assertEquals(0, 
table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).count());
+    // and only 3rd partition should have valid file groups.
+    
assertTrue(table.getFileSystemView().getAllFileGroups(DEFAULT_THIRD_PARTITION_PATH).count()
 > 0);
+  }
+
   @Test
   public void testRollbackForCanIndexLogFile() throws IOException {
     //1. prepare data and assert data result

Reply via email to