This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 58b53f05980 [HUDI-4945] Add a test case for batch clean (#6845)
58b53f05980 is described below
commit 58b53f0598015189b62eeb3417132bb1ae71d0bd
Author: HunterXHunter <[email protected]>
AuthorDate: Mon Jun 24 07:12:19 2024 +0800
[HUDI-4945] Add a test case for batch clean (#6845)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../apache/hudi/table/ITTestHoodieDataSource.java | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 884e3a99cf4..d9f3badf1fb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -447,6 +447,28 @@ public class ITTestHoodieDataSource {
"some commits should be cleaned");
}
+ @Test
+ void testBatchWriteWithCleaning() {
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
+ .end();
+ batchTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+ execInsertSql(batchTableEnv, insertInto);
+ execInsertSql(batchTableEnv, insertInto);
+ execInsertSql(batchTableEnv, insertInto);
+ Configuration defaultConf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ Map<String, String> options1 = new HashMap<>(defaultConf.toMap());
+ options1.put(FlinkOptions.TABLE_NAME.key(), "t1");
+ Configuration conf = Configuration.fromMap(options1);
+ HoodieTimeline timeline =
StreamerUtil.createMetaClient(conf).getActiveTimeline();
+ assertTrue(timeline.filterCompletedInstants()
+ .getInstants().stream().anyMatch(instant ->
instant.getAction().equals("clean")),
+ "some commits should be cleaned");
+ }
+
@Test
void testStreamReadWithDeletes() throws Exception {
// create filesystem table named source