satishkotha commented on a change in pull request #2677:
URL: https://github.com/apache/hudi/pull/2677#discussion_r599817481



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
##########
@@ -491,6 +519,166 @@ public void testConvertCommitMetadata() {
     assertEquals(expectedCommitMetadata.getOperationType(), 
WriteOperationType.INSERT.toString());
   }
 
+  @Test
+  public void testArchiveCompletedClean() throws IOException {
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    createCleanMetadata("10", false);
+    createCleanMetadata("11", false);
+    createCleanMetadata("12", false);
+    HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, 
"clean", "12");
+    createCleanMetadata("13", false);
+    HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, 
"clean", "13");
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+
+    List<HoodieInstant> notArchivedInstants = 
metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList());
+    //There will be 3 * 2 files but due to TimelineLayoutV1 this will show as 
2.
+    assertEquals(2, notArchivedInstants.size(), "Not archived instants should 
be 2");
+    assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, 
notArchivedInstant2), "");
+  }
+
+  @Test
+  public void testArchiveCompletedRollback() throws IOException {
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    createCommitAndRollbackFile("6", "10", false);
+    createCommitAndRollbackFile("8", "11", false);
+    createCommitAndRollbackFile("7", "12", false);
+    HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, 
"rollback", "12");
+
+    createCommitAndRollbackFile("5", "13", false);
+    HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, 
"rollback", "13");
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+
+    List<HoodieInstant> notArchivedInstants = 
metaClient.getActiveTimeline().reload().getRollbackTimeline().getInstants().collect(Collectors.toList());
+    //There will be 2 * 2 files but due to TimelineLayoutV1 this will show as 
2.
+    assertEquals(2, notArchivedInstants.size(), "Not archived instants should 
be 2");
+    assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, 
notArchivedInstant2), "");
+  }
+
+  @Test
+  public void 
testArchiveCompletedShouldRetainMinInstantsIfInstantsGreaterThanMaxtoKeep() 
throws IOException {
+    int minInstants = 2;
+    int maxInstants = 10;
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants,
 maxInstants).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    for (int i = 0; i < maxInstants + 2; i++) {
+      createCleanMetadata(i + "", false);
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+    assertEquals(minInstants, 
metaClient.getActiveTimeline().reload().getInstants().count());
+  }
+
+  @Test
+  public void 
testArchiveCompletedShouldNotArchiveIfInstantsLessThanMaxtoKeep() throws 
IOException {
+    int minInstants = 2;
+    int maxInstants = 10;
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants,
 maxInstants).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    for (int i = 0; i < maxInstants; i++) {
+      createCleanMetadata(i + "", false);
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+    assertEquals(maxInstants, 
metaClient.getActiveTimeline().reload().getInstants().count());
+  }
+
+  @Test
+  public void testArchiveCompletedRollbackAndClean() throws IOException {
+    int minInstantsToKeep = 2;
+    int maxInstantsToKeep = 10;
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep,
 maxInstantsToKeep).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    int startInstant = 1;
+    for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
+      createCleanMetadata(startInstant + "", false);
+    }
+
+    for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
+      createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", 
false);
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+
+    Stream<HoodieInstant> currentInstants = 
metaClient.getActiveTimeline().reload().getInstants();
+    Map<Object, List<HoodieInstant>> actionInstantMap = 
currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction));
+
+    assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must 
be preset");
+    assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(), 
"Should have min instant");
+
+    assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key 
must be preset");
+    assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), 
"Should have min instant");
+  }
+
+  @Test
+  public void testArchiveInflightRollbackAndClean() throws IOException {
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    createCleanMetadata("10", false);
+    createCleanMetadata("11", false);
+    createCleanMetadata("12", false);
+    HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, 
"clean", "12");

Review comment:
       i see this pattern in many places. Do you think we can change 
'createCleanMetadata' to return HoodieInstant it created? that would avoid code 
repetition and keep it clean

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
##########
@@ -388,6 +391,31 @@ public void testArchiveCommitSavepointNoHole() throws 
IOException {
         "Archived commits should always be safe");
   }
 
+  @Test
+  public void testArchiveRollbacks() throws IOException {
+    HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
+            
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2).forTable("test-trip-table")
+            
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+            .build();
+
+    createCommitAndRollbackFile("100", "101", false);
+    createCommitAndRollbackFile("102", "103", false);
+    createCommitAndRollbackFile("104", "105", false);
+    createCommitAndRollbackFile("106", "107", false);
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    assertTrue(archiveLog.archiveIfRequired(context));
+    HoodieTimeline timeline = 
metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
+    assertEquals(2, timeline.countInstants(),
+            "first two commits must have been archived");
+    assertFalse(metaClient.getActiveTimeline().containsInstant(new 
HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "101")),
+            "first rollback must have been archived");
+    assertFalse(metaClient.getActiveTimeline().containsInstant(new 
HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "103")),

Review comment:
       can you also assert that other rollback files are present

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
##########
@@ -491,6 +519,166 @@ public void testConvertCommitMetadata() {
     assertEquals(expectedCommitMetadata.getOperationType(), 
WriteOperationType.INSERT.toString());
   }
 
+  @Test
+  public void testArchiveCompletedClean() throws IOException {
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    createCleanMetadata("10", false);
+    createCleanMetadata("11", false);
+    createCleanMetadata("12", false);
+    HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, 
"clean", "12");
+    createCleanMetadata("13", false);
+    HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, 
"clean", "13");
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+
+    List<HoodieInstant> notArchivedInstants = 
metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList());
+    //There will be 3 * 2 files but due to TimelineLayoutV1 this will show as 
2.
+    assertEquals(2, notArchivedInstants.size(), "Not archived instants should 
be 2");
+    assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, 
notArchivedInstant2), "");
+  }
+
+  @Test
+  public void testArchiveCompletedRollback() throws IOException {
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    createCommitAndRollbackFile("6", "10", false);
+    createCommitAndRollbackFile("8", "11", false);
+    createCommitAndRollbackFile("7", "12", false);
+    HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, 
"rollback", "12");
+
+    createCommitAndRollbackFile("5", "13", false);
+    HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, 
"rollback", "13");
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+
+    List<HoodieInstant> notArchivedInstants = 
metaClient.getActiveTimeline().reload().getRollbackTimeline().getInstants().collect(Collectors.toList());
+    //There will be 2 * 2 files but due to TimelineLayoutV1 this will show as 
2.
+    assertEquals(2, notArchivedInstants.size(), "Not archived instants should 
be 2");
+    assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, 
notArchivedInstant2), "");
+  }
+
+  @Test
+  public void 
testArchiveCompletedShouldRetainMinInstantsIfInstantsGreaterThanMaxtoKeep() 
throws IOException {
+    int minInstants = 2;
+    int maxInstants = 10;
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants,
 maxInstants).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    for (int i = 0; i < maxInstants + 2; i++) {
+      createCleanMetadata(i + "", false);
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+    assertEquals(minInstants, 
metaClient.getActiveTimeline().reload().getInstants().count());
+  }
+
+  @Test
+  public void 
testArchiveCompletedShouldNotArchiveIfInstantsLessThanMaxtoKeep() throws 
IOException {
+    int minInstants = 2;
+    int maxInstants = 10;
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants,
 maxInstants).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    for (int i = 0; i < maxInstants; i++) {
+      createCleanMetadata(i + "", false);
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+    assertEquals(maxInstants, 
metaClient.getActiveTimeline().reload().getInstants().count());
+  }
+
+  @Test
+  public void testArchiveCompletedRollbackAndClean() throws IOException {

Review comment:
       I see a lot of tests which is good. But if i read correctly, some of 
these are repetitive. Can you comment this test and see if coverage changes? CI 
has been a pain, so we want to make sure to keep tests that only add value.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
##########
@@ -491,6 +519,166 @@ public void testConvertCommitMetadata() {
     assertEquals(expectedCommitMetadata.getOperationType(), 
WriteOperationType.INSERT.toString());
   }
 
+  @Test
+  public void testArchiveCompletedClean() throws IOException {
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    createCleanMetadata("10", false);
+    createCleanMetadata("11", false);
+    createCleanMetadata("12", false);
+    HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, 
"clean", "12");
+    createCleanMetadata("13", false);
+    HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, 
"clean", "13");
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+
+    List<HoodieInstant> notArchivedInstants = 
metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList());
+    //There will be 3 * 2 files but due to TimelineLayoutV1 this will show as 
2.
+    assertEquals(2, notArchivedInstants.size(), "Not archived instants should 
be 2");
+    assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, 
notArchivedInstant2), "");
+  }
+
+  @Test
+  public void testArchiveCompletedRollback() throws IOException {
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    createCommitAndRollbackFile("6", "10", false);
+    createCommitAndRollbackFile("8", "11", false);
+    createCommitAndRollbackFile("7", "12", false);
+    HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, 
"rollback", "12");
+
+    createCommitAndRollbackFile("5", "13", false);
+    HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, 
"rollback", "13");
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+
+    List<HoodieInstant> notArchivedInstants = 
metaClient.getActiveTimeline().reload().getRollbackTimeline().getInstants().collect(Collectors.toList());
+    //There will be 2 * 2 files but due to TimelineLayoutV1 this will show as 
2.
+    assertEquals(2, notArchivedInstants.size(), "Not archived instants should 
be 2");
+    assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, 
notArchivedInstant2), "");
+  }
+
+  @Test
+  public void 
testArchiveCompletedShouldRetainMinInstantsIfInstantsGreaterThanMaxtoKeep() 
throws IOException {
+    int minInstants = 2;
+    int maxInstants = 10;
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants,
 maxInstants).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    for (int i = 0; i < maxInstants + 2; i++) {
+      createCleanMetadata(i + "", false);
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+    assertEquals(minInstants, 
metaClient.getActiveTimeline().reload().getInstants().count());
+  }
+
+  @Test
+  public void 
testArchiveCompletedShouldNotArchiveIfInstantsLessThanMaxtoKeep() throws 
IOException {
+    int minInstants = 2;
+    int maxInstants = 10;
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants,
 maxInstants).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    for (int i = 0; i < maxInstants; i++) {
+      createCleanMetadata(i + "", false);
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+    assertEquals(maxInstants, 
metaClient.getActiveTimeline().reload().getInstants().count());
+  }
+
+  @Test
+  public void testArchiveCompletedRollbackAndClean() throws IOException {
+    int minInstantsToKeep = 2;
+    int maxInstantsToKeep = 10;
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep,
 maxInstantsToKeep).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    int startInstant = 1;
+    for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
+      createCleanMetadata(startInstant + "", false);
+    }
+
+    for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
+      createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", 
false);
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+
+    archiveLog.archiveIfRequired(context);
+
+    Stream<HoodieInstant> currentInstants = 
metaClient.getActiveTimeline().reload().getInstants();
+    Map<Object, List<HoodieInstant>> actionInstantMap = 
currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction));
+
+    assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must 
be preset");
+    assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(), 
"Should have min instant");
+
+    assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key 
must be preset");
+    assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), 
"Should have min instant");
+  }
+
+  @Test
+  public void testArchiveInflightRollbackAndClean() throws IOException {

Review comment:
       does this test include rollbacks? can you change name appropriately?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to