jsbali commented on a change in pull request #2677: URL: https://github.com/apache/hudi/pull/2677#discussion_r600463420
########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java ########## @@ -491,6 +519,166 @@ public void testConvertCommitMetadata() { assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString()); } + @Test + public void testArchiveCompletedClean() throws IOException { + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + createCleanMetadata("10", false); + createCleanMetadata("11", false); + createCleanMetadata("12", false); + HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, "clean", "12"); + createCleanMetadata("13", false); + HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, "clean", "13"); + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + + List<HoodieInstant> notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList()); + //There will be 3 * 2 files but due to TimelineLayoutV1 this will show as 2. + assertEquals(2, notArchivedInstants.size(), "Not archived instants should be 2"); + assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2), ""); + } + + @Test + public void testArchiveCompletedRollback() throws IOException { + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + createCommitAndRollbackFile("6", "10", false); + createCommitAndRollbackFile("8", "11", false); + createCommitAndRollbackFile("7", "12", false); + HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, "rollback", "12"); + + createCommitAndRollbackFile("5", "13", false); + HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, "rollback", "13"); + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + + List<HoodieInstant> notArchivedInstants = metaClient.getActiveTimeline().reload().getRollbackTimeline().getInstants().collect(Collectors.toList()); + //There will be 2 * 2 files but due to TimelineLayoutV1 this will show as 2. + assertEquals(2, notArchivedInstants.size(), "Not archived instants should be 2"); + assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2), ""); + } + + @Test + public void testArchiveCompletedShouldRetainMinInstantsIfInstantsGreaterThanMaxtoKeep() throws IOException { + int minInstants = 2; + int maxInstants = 10; + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants, maxInstants).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + for (int i = 0; i < maxInstants + 2; i++) { + createCleanMetadata(i + "", false); + } + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + assertEquals(minInstants, metaClient.getActiveTimeline().reload().getInstants().count()); + } + + @Test + public void testArchiveCompletedShouldNotArchiveIfInstantsLessThanMaxtoKeep() throws IOException { + int minInstants = 2; + int maxInstants = 10; + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants, maxInstants).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + for (int i = 0; i < maxInstants; i++) { + createCleanMetadata(i + "", false); + } + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + assertEquals(maxInstants, metaClient.getActiveTimeline().reload().getInstants().count()); + } + + @Test + public void testArchiveCompletedRollbackAndClean() throws IOException { + int minInstantsToKeep = 2; + int maxInstantsToKeep = 10; + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + int startInstant = 1; + for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) { + createCleanMetadata(startInstant + "", false); + } + + for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) { + createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false); + } + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + + Stream<HoodieInstant> currentInstants = metaClient.getActiveTimeline().reload().getInstants(); + Map<Object, List<HoodieInstant>> actionInstantMap = currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction)); + + assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must be preset"); + assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(), "Should have min instant"); + + assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key must be preset"); + assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), "Should have min instant"); + } + + @Test + public void testArchiveInflightRollbackAndClean() throws IOException { Review comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org