jsbali commented on a change in pull request #2677:
URL: https://github.com/apache/hudi/pull/2677#discussion_r600463420
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
##########
@@ -491,6 +519,166 @@ public void testConvertCommitMetadata() {
assertEquals(expectedCommitMetadata.getOperationType(),
WriteOperationType.INSERT.toString());
}
+ @Test
+ public void testArchiveCompletedClean() throws IOException {
+ HoodieWriteConfig cfg =
+
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2).forTable("test-trip-table")
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
3).build())
+ .build();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ createCleanMetadata("10", false);
+ createCleanMetadata("11", false);
+ createCleanMetadata("12", false);
+ HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED,
"clean", "12");
+ createCleanMetadata("13", false);
+ HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED,
"clean", "13");
+
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg,
table);
+
+ archiveLog.archiveIfRequired(context);
+
+ List<HoodieInstant> notArchivedInstants =
metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList());
+ //There will be 3 * 2 files but due to TimelineLayoutV1 this will show as
2.
+ assertEquals(2, notArchivedInstants.size(), "Not archived instants should
be 2");
+ assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1,
notArchivedInstant2), "");
+ }
+
+ @Test
+ public void testArchiveCompletedRollback() throws IOException {
+ HoodieWriteConfig cfg =
+
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2).forTable("test-trip-table")
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
3).build())
+ .build();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ createCommitAndRollbackFile("6", "10", false);
+ createCommitAndRollbackFile("8", "11", false);
+ createCommitAndRollbackFile("7", "12", false);
+ HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED,
"rollback", "12");
+
+ createCommitAndRollbackFile("5", "13", false);
+ HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED,
"rollback", "13");
+
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg,
table);
+
+ archiveLog.archiveIfRequired(context);
+
+ List<HoodieInstant> notArchivedInstants =
metaClient.getActiveTimeline().reload().getRollbackTimeline().getInstants().collect(Collectors.toList());
+ //There will be 2 * 2 files but due to TimelineLayoutV1 this will show as
2.
+ assertEquals(2, notArchivedInstants.size(), "Not archived instants should
be 2");
+ assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1,
notArchivedInstant2), "");
+ }
+
+ @Test
+ public void
testArchiveCompletedShouldRetainMinInstantsIfInstantsGreaterThanMaxtoKeep()
throws IOException {
+ int minInstants = 2;
+ int maxInstants = 10;
+ HoodieWriteConfig cfg =
+
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2).forTable("test-trip-table")
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants,
maxInstants).build())
+ .build();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ for (int i = 0; i < maxInstants + 2; i++) {
+ createCleanMetadata(i + "", false);
+ }
+
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg,
table);
+
+ archiveLog.archiveIfRequired(context);
+ assertEquals(minInstants,
metaClient.getActiveTimeline().reload().getInstants().count());
+ }
+
+ @Test
+ public void
testArchiveCompletedShouldNotArchiveIfInstantsLessThanMaxtoKeep() throws
IOException {
+ int minInstants = 2;
+ int maxInstants = 10;
+ HoodieWriteConfig cfg =
+
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2).forTable("test-trip-table")
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants,
maxInstants).build())
+ .build();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ for (int i = 0; i < maxInstants; i++) {
+ createCleanMetadata(i + "", false);
+ }
+
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg,
table);
+
+ archiveLog.archiveIfRequired(context);
+ assertEquals(maxInstants,
metaClient.getActiveTimeline().reload().getInstants().count());
+ }
+
+ @Test
+ public void testArchiveCompletedRollbackAndClean() throws IOException {
+ int minInstantsToKeep = 2;
+ int maxInstantsToKeep = 10;
+ HoodieWriteConfig cfg =
+
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2).forTable("test-trip-table")
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep,
maxInstantsToKeep).build())
+ .build();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ int startInstant = 1;
+ for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
+ createCleanMetadata(startInstant + "", false);
+ }
+
+ for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
+ createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "",
false);
+ }
+
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg,
table);
+
+ archiveLog.archiveIfRequired(context);
+
+ Stream<HoodieInstant> currentInstants =
metaClient.getActiveTimeline().reload().getInstants();
+ Map<Object, List<HoodieInstant>> actionInstantMap =
currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction));
+
+ assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must
be preset");
+ assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(),
"Should have min instant");
+
+ assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key
must be preset");
+ assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(),
"Should have min instant");
+ }
+
+ @Test
+ public void testArchiveInflightRollbackAndClean() throws IOException {
Review comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]