This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 73f63220e82b test: Make testReadChangelogIncremental parametrized
(#17564)
73f63220e82b is described below
commit 73f63220e82bbd124dbbc204524d8577c95dba27
Author: Sergey Troshkov <[email protected]>
AuthorDate: Sat Dec 13 02:30:45 2025 +0700
test: Make testReadChangelogIncremental parametrized (#17564)
---
.../apache/hudi/table/ITTestHoodieDataSource.java | 49 ++++------------------
1 file changed, 9 insertions(+), 40 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 8eb38198cd22..fbb1f0cce8f1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1585,50 +1585,15 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result, TestData.dataSetInsert(5, 6));
}
- @Test
- void testReadChangelogIncremental() throws Exception {
- TableEnvironment tableEnv = streamTableEnv;
- Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
- conf.set(FlinkOptions.TABLE_NAME, "t1");
- conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); // for batch upsert
- conf.set(FlinkOptions.CDC_ENABLED, true);
-
- // write 3 batches of the same data set
- TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
- TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
- TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
-
- String latestCommit =
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
-
- String hoodieTableDDL = sql("t1")
- .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
- .option(FlinkOptions.READ_START_COMMIT, latestCommit)
- .option(FlinkOptions.CDC_ENABLED, true)
- .end();
- tableEnv.executeSql(hoodieTableDDL);
-
- List<Row> result1 = CollectionUtil.iterableToList(
- () -> tableEnv.sqlQuery("select * from t1").execute().collect());
- assertRowsEquals(result1, TestData.dataSetUpsert(2, 1));
-
- // write another 10 batches of dataset
- for (int i = 0; i < 10; i++) {
- TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
- }
-
- String firstCommit =
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
- final String query = String.format("select count(*) from t1/*+
options('read.start-commit'='%s')*/", firstCommit);
- List<Row> result2 = CollectionUtil.iterableToList(
- () -> tableEnv.sqlQuery(query).execute().collect());
- assertRowsEquals(result2.subList(result2.size() - 2, result2.size()),
"[-U[1], +U[2]]");
- }
-
- @Test
- void testReadChangelogIncrementalMor() throws Exception {
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testReadChangelogIncremental(HoodieTableType tableType) throws
Exception {
TableEnvironment tableEnv = streamTableEnv;
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
conf.set(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate the
changes on the fly
conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); // for batch upsert
conf.set(FlinkOptions.CDC_ENABLED, true);
@@ -1642,6 +1607,10 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
+ .option(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false)
+ .option(FlinkOptions.READ_CDC_FROM_CHANGELOG, false)
.option(FlinkOptions.READ_START_COMMIT, latestCommit)
.option(FlinkOptions.CDC_ENABLED, true)
.end();