yihua commented on code in PR #6926:
URL: https://github.com/apache/hudi/pull/6926#discussion_r1002301042


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java:
##########
@@ -99,6 +99,206 @@ private static Stream<Arguments> 
argumentsForTestKeepLatestCommits() {
     );
   }
 
+  private static Stream<Arguments> argumentsForTestTriggerCleanEveryNCommits() 
{
+    return Stream.of(
+            Arguments.of(1),
+                Arguments.of(2),
+                Arguments.of(3),
+                Arguments.of(4)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsForTestTriggerCleanEveryNCommits")
+  public void testTriggerCleanEveryNthCommit(int minCommitsToTriggerClean) 
throws Exception {
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
+            
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
+            .withEmbeddedTimelineServerEnabled(false)
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
+                    
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+                    
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                    .retainCommits(1)
+                    .withMaxCommitsBeforeCleaning(minCommitsToTriggerClean)
+                    .build()).build();
+
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);

Review Comment:
   To run this test with metadata table enabled, I believe you need to use 
`HoodieMetadataTestTable`.  You can tackle this in a separate PR.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java:
##########
@@ -99,6 +99,206 @@ private static Stream<Arguments> 
argumentsForTestKeepLatestCommits() {
     );
   }
 
+  private static Stream<Arguments> argumentsForTestTriggerCleanEveryNCommits() 
{
+    return Stream.of(
+            Arguments.of(1),
+                Arguments.of(2),
+                Arguments.of(3),
+                Arguments.of(4)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsForTestTriggerCleanEveryNCommits")

Review Comment:
   nit: make the name consistent?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java:
##########
@@ -99,6 +99,206 @@ private static Stream<Arguments> 
argumentsForTestKeepLatestCommits() {
     );
   }
 
+  private static Stream<Arguments> argumentsForTestTriggerCleanEveryNCommits() 
{
+    return Stream.of(
+            Arguments.of(1),
+                Arguments.of(2),
+                Arguments.of(3),
+                Arguments.of(4)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsForTestTriggerCleanEveryNCommits")
+  public void testTriggerCleanEveryNthCommit(int minCommitsToTriggerClean) 
throws Exception {
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
+            
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
+            .withEmbeddedTimelineServerEnabled(false)
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
+                    
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+                    
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                    .retainCommits(1)
+                    .withMaxCommitsBeforeCleaning(minCommitsToTriggerClean)
+                    .build()).build();
+
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    String p0 = "2020/01/01";
+    String p1 = "2020/01/02";
+
+    // make 1 commit, with 1 file per partition
+    String file1P0C0 = UUID.randomUUID().toString();
+    String file1P1C0 = UUID.randomUUID().toString();
+    testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+
+    HoodieCommitMetadata commitMetadata = 
generateCommitMetadata("00000000000001",
+            Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+              {
+                put(p0, CollectionUtils.createImmutableList(file1P0C0));
+                put(p1, CollectionUtils.createImmutableList(file1P1C0));
+              }
+            })
+    );
+    metaClient.getActiveTimeline().saveAsComplete(
+            new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "00000000000001"),
+            
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    List<HoodieCleanStat> hoodieCleanStatsOne =
+            runCleaner(config, false, false, 2, true);
+    assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next commit, with 1 insert & 1 update per partition
+    Map<String, String> partitionAndFileId002 = 
testTable.addInflightCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p0,
 p1);
+    String file2P0C1 = partitionAndFileId002.get(p0);
+    String file2P1C1 = partitionAndFileId002.get(p1);
+    testTable.forCommit("00000000000003").withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+    commitMetadata = generateCommitMetadata("00000000000003", new 
HashMap<String, List<String>>() {
+      {
+        put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
+        put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
+      }
+    });
+    metaClient.getActiveTimeline().saveAsComplete(
+            new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "00000000000003"),
+            
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    List<HoodieCleanStat> hoodieCleanStatsTwo =
+            runCleaner(config, false, false, 4, true);
+    assertEquals(0, hoodieCleanStatsTwo.size(), "Must not clean any file. We 
have to keep 1 version before the latest commit time");
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next commit, with 2 updates to existing files, and 1 insert
+    String file3P0C2 = testTable.addInflightCommit("00000000000005")
+            .withBaseFilesInPartition(p0, file1P0C0)
+            .withBaseFilesInPartition(p0, file2P0C1)
+            .getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+    commitMetadata = generateCommitMetadata("00000000000005",
+            CollectionUtils.createImmutableMap(
+                    p0, CollectionUtils.createImmutableList(file1P0C0, 
file2P0C1, file3P0C2)));
+    metaClient.getActiveTimeline().saveAsComplete(
+            new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "00000000000005"),
+            
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+    List<HoodieCleanStat> hoodieCleanStatsThree =
+            runCleaner(config, false, false, 6, true);
+
+    switch (minCommitsToTriggerClean) {
+      case 1:
+      case 2:
+      case 3:
+        assertEquals(2, hoodieCleanStatsThree.size(), "Must clean at least 1 
files");
+        assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+        assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+        break;
+      case 4:
+        assertEquals(0, hoodieCleanStatsThree.size(),
+                "Must not clean any file. Only 3 commits have occurred, need 
at least 4 for cleaning to trigger");
+        assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+        break;
+      default:
+    }
+
+    // make next commit, with 2 updates to existing files, and 1 insert
+    String file4P0C3 = testTable.addInflightCommit("00000000000007")
+            .withBaseFilesInPartition(p0, file1P0C0)
+            .withBaseFilesInPartition(p0, file2P0C1)
+            .getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+    commitMetadata = generateCommitMetadata("00000000000007",
+            CollectionUtils.createImmutableMap(
+                    p0, CollectionUtils.createImmutableList(file1P0C0, 
file2P0C1, file4P0C3)));
+    metaClient.getActiveTimeline().saveAsComplete(
+            new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "00000000000007"),
+            
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+    List<HoodieCleanStat> hoodieCleanStatsFour =
+            runCleaner(config, false, false, 8, true);
+    HoodieCleanStat partitionZeroCleanStat = 
getCleanStat(hoodieCleanStatsFour, p0);
+    HoodieCleanStat partitionOneCleanStat = getCleanStat(hoodieCleanStatsFour, 
p1);
+
+    switch (minCommitsToTriggerClean) {
+      case 1:
+        assertEquals(2, partitionZeroCleanStat.getSuccessDeleteFiles().size(), 
"Must clean at least one old file");
+        assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+        assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+        assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
+        break;
+      case 2:
+      case 3:
+        assertEquals(0, hoodieCleanStatsFour.size(),
+                "Must not clean any file. Only one commit has happened since 
the last clean!");
+        break;
+      case 4:
+        assertEquals(3, partitionZeroCleanStat.getSuccessDeleteFiles().size(), 
"Must clean at least one old file");
+        assertEquals(1, partitionOneCleanStat.getSuccessDeleteFiles().size());
+        assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+        assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+        assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+        assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
+        break;
+      default:
+    }
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    String file5P0C4 = testTable.addInflightCommit("00000000000009")
+            .withBaseFilesInPartition(p0, file1P0C0)
+            .withBaseFilesInPartition(p0, file2P0C1)
+            .getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+    commitMetadata = generateCommitMetadata("00000000000009", 
CollectionUtils.createImmutableMap(
+            p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file5P0C4)));
+    metaClient.getActiveTimeline().saveAsComplete(
+            new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "00000000000009"),
+            
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+    List<HoodieCleanStat> hoodieCleanStatsFive =
+            runCleaner(config, false, false, 10, true);
+
+    switch (minCommitsToTriggerClean) {
+      case 1:
+        assertEquals(1, hoodieCleanStatsFive.size(), "Must clean at least one 
old file");
+        assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
+        assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+        assertTrue(testTable.baseFileExists(p0, "00000000000007", file1P0C0));
+        break;
+      case 2:
+        assertEquals(1, hoodieCleanStatsFive.size(), "Must clean files written 
in second and third commits");
+        assertFalse(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+        assertFalse(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+        assertFalse(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
+        assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+        assertTrue(testTable.baseFileExists(p0, "00000000000007", file1P0C0));
+        break;
+      case 3:
+        assertEquals(0, hoodieCleanStatsFive.size(),
+                "Must not clean any file. Only two commits have happened since 
the last clean!");
+        break;
+      case 4:
+        assertEquals(0, hoodieCleanStatsFive.size(),
+                "Must not clean any file. Only one commit has happened since 
the last clean!");
+        break;
+      default:
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsForTestTriggerCleanEveryNCommits")
+  public void testTriggerCleanEveryNthCommitWithMetadataTableEnabled(int 
minCommitsToTriggerClean) throws Exception {
+    //TODO: this has some bug and needs to be checked once.
+  }

Review Comment:
   Could you remove this if not used now?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java:
##########
@@ -99,6 +99,206 @@ private static Stream<Arguments> 
argumentsForTestKeepLatestCommits() {
     );
   }
 
+  private static Stream<Arguments> argumentsForTestTriggerCleanEveryNCommits() 
{
+    return Stream.of(
+            Arguments.of(1),
+                Arguments.of(2),
+                Arguments.of(3),
+                Arguments.of(4)

Review Comment:
   nit: indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to