This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9744de26709 [HUDI-6246] Add tests to validate restore of compaction
commit (#9157)
9744de26709 is described below
commit 9744de26709a667f11aebae90f2d10a8fd93e0b5
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Jul 10 03:18:58 2023 -0400
[HUDI-6246] Add tests to validate restore of compaction commit (#9157)
---
.../TestMergeOnReadRollbackActionExecutor.java | 78 ++++++++++++++++++++++
1 file changed, 78 insertions(+)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index b3d10db6e52..848b51a6850 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -44,6 +44,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaRDD;
@@ -63,6 +64,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -160,6 +162,82 @@ public class TestMergeOnReadRollbackActionExecutor extends
HoodieClientRollbackT
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table,
"002").doesMarkerDirExist());
}
+ @Test
+ public void testMergeOnReadRestoreCompactionCommit() throws IOException {
+ boolean isUsingMarkers = false;
+ HoodieWriteConfig cfg =
getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
+
+ // 1. ingest data to partition 3.
+ //just generate two partitions
+ HoodieTestDataGenerator dataGenPartition3 = new
HoodieTestDataGenerator(new String[]{DEFAULT_THIRD_PARTITION_PATH});
+ HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new
String[]{DEFAULT_THIRD_PARTITION_PATH}, basePath);
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ /**
+ * Write 1 (only inserts)
+ */
+ String newCommitTime = "0000001";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records =
dataGenPartition3.generateInsertsContainsAllPartitions(newCommitTime, 2);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+ Assertions.assertNoWriteErrors(statuses.collect());
+ client.commit(newCommitTime, statuses);
+
+ //2. Ingest inserts + upserts to partition 1 and 2. we will eventually
rollback both these commits using restore flow.
+ List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+ List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+ twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices,
secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+ List<HoodieLogFile> firstPartitionCommit2LogFiles = new ArrayList<>();
+ List<HoodieLogFile> secondPartitionCommit2LogFiles = new ArrayList<>();
+
firstPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile
-> firstPartitionCommit2LogFiles.add(logFile));
+ assertEquals(1, firstPartitionCommit2LogFiles.size());
+
secondPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile
-> secondPartitionCommit2LogFiles.add(logFile));
+ assertEquals(1, secondPartitionCommit2LogFiles.size());
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+
+ //3. rollback the update to partition1 and partition2
+ HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers,
HoodieTimeline.DELTA_COMMIT_ACTION, "002");
+ BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
+ new BaseRollbackPlanActionExecutor(context, cfg, table, "003",
rollBackInstant, false,
+ cfg.shouldRollbackUsingMarkers(), true);
+ mergeOnReadRollbackPlanActionExecutor.execute().get();
+ MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new
MergeOnReadRollbackActionExecutor(
+ context,
+ cfg,
+ table,
+ "003",
+ rollBackInstant,
+ true,
+ false);
+ //3. assert the rollback stat
+ Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata =
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
+ assertEquals(2, rollbackMetadata.size());
+ assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table,
"002").doesMarkerDirExist());
+
+ // rollback 001 as well. this time since its part of the restore, entire
file slice should be deleted and not just log files (for partition1 and
partition2)
+ HoodieInstant rollBackInstant1 = new HoodieInstant(isUsingMarkers,
HoodieTimeline.DELTA_COMMIT_ACTION, "001");
+ BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor1 =
+ new BaseRollbackPlanActionExecutor(context, cfg, table, "004",
rollBackInstant1, false,
+ cfg.shouldRollbackUsingMarkers(), true);
+ mergeOnReadRollbackPlanActionExecutor1.execute().get();
+ MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor1 = new
MergeOnReadRollbackActionExecutor(
+ context,
+ cfg,
+ table,
+ "004",
+ rollBackInstant1,
+ true,
+ false);
+ mergeOnReadRollbackActionExecutor1.execute().getPartitionMetadata();
+
+ //assert there are no valid file groups in both partition1 and partition2
+ assertEquals(0,
table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).count());
+ assertEquals(0,
table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).count());
+ // and only 3rd partition should have valid file groups.
+
assertTrue(table.getFileSystemView().getAllFileGroups(DEFAULT_THIRD_PARTITION_PATH).count()
> 0);
+ }
+
@Test
public void testRollbackForCanIndexLogFile() throws IOException {
//1. prepare data and assert data result