This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 58b53f05980 [HUDI-4945] Add a test case for batch clean (#6845)
58b53f05980 is described below

commit 58b53f0598015189b62eeb3417132bb1ae71d0bd
Author: HunterXHunter <[email protected]>
AuthorDate: Mon Jun 24 07:12:19 2024 +0800

    [HUDI-4945] Add a test case for batch clean (#6845)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 884e3a99cf4..d9f3badf1fb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -447,6 +447,28 @@ public class ITTestHoodieDataSource {
         "some commits should be cleaned");
   }
 
+  @Test
+  void testBatchWriteWithCleaning() {
+    String hoodieTableDDL = sql("t1")
+            .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+            .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
+            .end();
+    batchTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 values\n"
+            + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+    execInsertSql(batchTableEnv, insertInto);
+    execInsertSql(batchTableEnv, insertInto);
+    execInsertSql(batchTableEnv, insertInto);
+    Configuration defaultConf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    Map<String, String> options1 = new HashMap<>(defaultConf.toMap());
+    options1.put(FlinkOptions.TABLE_NAME.key(), "t1");
+    Configuration conf = Configuration.fromMap(options1);
+    HoodieTimeline timeline = 
StreamerUtil.createMetaClient(conf).getActiveTimeline();
+    assertTrue(timeline.filterCompletedInstants()
+            .getInstants().stream().anyMatch(instant -> 
instant.getAction().equals("clean")),
+            "some commits should be cleaned");
+  }
+
   @Test
   void testStreamReadWithDeletes() throws Exception {
     // create filesystem table named source

Reply via email to