This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 73f63220e82b test: Make testReadChangelogIncremental parametrized 
(#17564)
73f63220e82b is described below

commit 73f63220e82bbd124dbbc204524d8577c95dba27
Author: Sergey Troshkov <[email protected]>
AuthorDate: Sat Dec 13 02:30:45 2025 +0700

    test: Make testReadChangelogIncremental parametrized (#17564)
---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 49 ++++------------------
 1 file changed, 9 insertions(+), 40 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 8eb38198cd22..fbb1f0cce8f1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1585,50 +1585,15 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, TestData.dataSetInsert(5, 6));
   }
 
-  @Test
-  void testReadChangelogIncremental() throws Exception {
-    TableEnvironment tableEnv = streamTableEnv;
-    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
-    conf.set(FlinkOptions.TABLE_NAME, "t1");
-    conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); // for batch upsert
-    conf.set(FlinkOptions.CDC_ENABLED, true);
-
-    // write 3 batches of the same data set
-    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
-    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
-    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
-
-    String latestCommit = 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
-
-    String hoodieTableDDL = sql("t1")
-        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-        .option(FlinkOptions.READ_START_COMMIT, latestCommit)
-        .option(FlinkOptions.CDC_ENABLED, true)
-        .end();
-    tableEnv.executeSql(hoodieTableDDL);
-
-    List<Row> result1 = CollectionUtil.iterableToList(
-        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
-    assertRowsEquals(result1, TestData.dataSetUpsert(2, 1));
-
-    // write another 10 batches of dataset
-    for (int i = 0; i < 10; i++) {
-      TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
-    }
-
-    String firstCommit = 
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
-    final String query = String.format("select count(*) from t1/*+ 
options('read.start-commit'='%s')*/", firstCommit);
-    List<Row> result2 = CollectionUtil.iterableToList(
-        () -> tableEnv.sqlQuery(query).execute().collect());
-    assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), 
"[-U[1], +U[2]]");
-  }
-
-  @Test
-  void testReadChangelogIncrementalMor() throws Exception {
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testReadChangelogIncremental(HoodieTableType tableType) throws 
Exception {
     TableEnvironment tableEnv = streamTableEnv;
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
     conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
     conf.set(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate the 
changes on the fly
     conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);  // for batch upsert
     conf.set(FlinkOptions.CDC_ENABLED, true);
@@ -1642,6 +1607,10 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
+        .option(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false)
+        .option(FlinkOptions.READ_CDC_FROM_CHANGELOG, false)
         .option(FlinkOptions.READ_START_COMMIT, latestCommit)
         .option(FlinkOptions.CDC_ENABLED, true)
         .end();

Reply via email to