This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ad49c4f81f7c fix: incorrect CDC read from table with unfinished
compaction (#17607)
ad49c4f81f7c is described below
commit ad49c4f81f7c05145ebac96d61c002f8db01e476
Author: Sergey Troshkov <[email protected]>
AuthorDate: Fri Dec 19 15:54:37 2025 +0700
fix: incorrect CDC read from table with unfinished compaction (#17607)
---
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 22 ++++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 62 +++++++++++++++++++---
2 files changed, 78 insertions(+), 6 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 4e60027a0538..74a16e6b042d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.AS_IS;
@@ -341,6 +342,27 @@ public class HoodieCDCExtractor {
.filter(logFile -> !logFile.equals(currentLogFileName))
.map(logFile -> new StoragePath(partitionPath, logFile))
.collect(Collectors.toList());
+ // get files list from unfinished compaction commit
+ List<StoragePath> filesToCompact = new ArrayList<>();
+ AtomicReference<String> lastBaseFile = new AtomicReference<>();
+
metaClient.getActiveTimeline().filterPendingCompactionTimeline().filter(i ->
i.compareTo(instant) < 0).getInstants()
+ .forEach(i -> {
+ try {
+
metaClient.getActiveTimeline().readCompactionPlan(i).getOperations()
+ .stream()
+ .filter(op ->
op.getPartitionPath().equals(fgId.getPartitionPath()) &&
op.getFileId().equals(fgId.getFileId()))
+ .forEach(operation -> {
+
filesToCompact.addAll(operation.getDeltaFilePaths().stream().map(logFile -> new
StoragePath(partitionPath, logFile)).collect(Collectors.toList()));
+ lastBaseFile.set(operation.getDataFilePath());
+ });
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read a compaction
plan on instant " + i, e);
+ }
+ });
+ if (baseFile == null && lastBaseFile.get() != null) {
+ baseFile = new HoodieBaseFile(storage.getPathInfo(new
StoragePath(partitionPath, lastBaseFile.get())));
+ }
+ logFilePaths.addAll(filesToCompact);
List<HoodieLogFile> logFiles =
storage.listDirectEntries(logFilePaths).stream()
.map(HoodieLogFile::new).collect(Collectors.toList());
return Option.of(new FileSlice(fgId, instant.requestedTime(),
baseFile, logFiles));
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index c12dd314fe06..0c7bbc46ad0b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1616,14 +1616,13 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @EnumSource(value = HoodieTableType.class)
- void testReadChangelogIncremental(HoodieTableType tableType) throws
Exception {
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
+ void testReadChangelogIncremental(HoodieTableType tableType, boolean
compactionEnabled) throws Exception {
TableEnvironment tableEnv = streamTableEnv;
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
- conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
- conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled);
conf.set(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate the
changes on the fly
conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); // for batch upsert
conf.set(FlinkOptions.CDC_ENABLED, true);
@@ -1638,8 +1637,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
- .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
- .option(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled)
.option(FlinkOptions.READ_CDC_FROM_CHANGELOG, false)
.option(FlinkOptions.READ_START_COMMIT, latestCommit)
.option(FlinkOptions.CDC_ENABLED, true)
@@ -1662,6 +1660,58 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result2.subList(result2.size() - 2, result2.size()),
"[-U[1], +U[2]]");
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testChangelogCompactionSchedule(Boolean compactionEnabled) throws
Exception {
+ TableEnvironment tableEnv = streamTableEnv;
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled);
+ // schedule compaction after 2 commits
+ conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2);
+ conf.set(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate the
changes on the fly
+ conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); // for batch upsert
+ conf.set(FlinkOptions.CDC_ENABLED, true);
+
+ // write 3 batches of the same data set
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+
+ String latestCommit =
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 2)
+ .option(FlinkOptions.READ_CDC_FROM_CHANGELOG, false)
+ .option(FlinkOptions.READ_START_COMMIT, latestCommit)
+ .option(FlinkOptions.CDC_ENABLED, true)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ String firstCommit =
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
+ String secondCommit = TestUtils.getNthCompleteInstant(new
StoragePath(tempFile.getAbsolutePath()), 1, HoodieTimeline.DELTA_COMMIT_ACTION);
+ String thirdCommit =
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+ final String query1 = String.format("select count(*) from t1/*+
options('read.start-commit'='%s')*/", firstCommit);
+ final String query2 = String.format("select count(*) from t1/*+
options('read.start-commit'='%s')*/", secondCommit);
+ final String query3 = String.format("select count(*) from t1/*+
options('read.start-commit'='%s')*/", thirdCommit);
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery(query1).execute().collect());
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery(query2).execute().collect());
+ List<Row> result3 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery(query3).execute().collect());
+ assertEquals(19, result1.size());
+ assertEquals(7, result2.size());
+ assertEquals(3, result3.size());
+ assertRowsEquals(result1.subList(result1.size() - 2, result1.size()),
"[-U[1], +U[2]]");
+ assertRowsEquals(result2.subList(result2.size() - 2, result2.size()),
"[-D[1], +I[1]]");
+ assertRowsEquals(result3.subList(result3.size() - 2, result3.size()),
"[-D[1], +I[1]]");
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws
Exception {