This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ad49c4f81f7c fix: incorrect CDC read from table with unfinished 
compaction (#17607)
ad49c4f81f7c is described below

commit ad49c4f81f7c05145ebac96d61c002f8db01e476
Author: Sergey Troshkov <[email protected]>
AuthorDate: Fri Dec 19 15:54:37 2025 +0700

    fix: incorrect CDC read from table with unfinished compaction (#17607)
---
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  | 22 ++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 62 +++++++++++++++++++---
 2 files changed, 78 insertions(+), 6 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 4e60027a0538..74a16e6b042d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.AS_IS;
@@ -341,6 +342,27 @@ public class HoodieCDCExtractor {
               .filter(logFile -> !logFile.equals(currentLogFileName))
               .map(logFile -> new StoragePath(partitionPath, logFile))
               .collect(Collectors.toList());
+          // get files list from unfinished compaction commit
+          List<StoragePath> filesToCompact = new ArrayList<>();
+          AtomicReference<String> lastBaseFile =  new AtomicReference<>();
+          
metaClient.getActiveTimeline().filterPendingCompactionTimeline().filter(i -> 
i.compareTo(instant) < 0).getInstants()
+              .forEach(i -> {
+                try {
+                  
metaClient.getActiveTimeline().readCompactionPlan(i).getOperations()
+                      .stream()
+                      .filter(op -> 
op.getPartitionPath().equals(fgId.getPartitionPath()) && 
op.getFileId().equals(fgId.getFileId()))
+                      .forEach(operation ->  {
+                        
filesToCompact.addAll(operation.getDeltaFilePaths().stream().map(logFile -> new 
StoragePath(partitionPath, logFile)).collect(Collectors.toList()));
+                        lastBaseFile.set(operation.getDataFilePath());
+                      });
+                } catch (IOException e) {
+                  throw new HoodieIOException("Failed to read a compaction 
plan on instant " + i, e);
+                }
+              });
+          if (baseFile == null && lastBaseFile.get() != null) {
+            baseFile = new HoodieBaseFile(storage.getPathInfo(new 
StoragePath(partitionPath, lastBaseFile.get())));
+          }
+          logFilePaths.addAll(filesToCompact);
           List<HoodieLogFile> logFiles = 
storage.listDirectEntries(logFilePaths).stream()
               .map(HoodieLogFile::new).collect(Collectors.toList());
           return Option.of(new FileSlice(fgId, instant.requestedTime(), 
baseFile, logFiles));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index c12dd314fe06..0c7bbc46ad0b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1616,14 +1616,13 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @EnumSource(value = HoodieTableType.class)
-  void testReadChangelogIncremental(HoodieTableType tableType) throws 
Exception {
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
+  void testReadChangelogIncremental(HoodieTableType tableType, boolean 
compactionEnabled) throws Exception {
     TableEnvironment tableEnv = streamTableEnv;
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
-    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
-    conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled);
     conf.set(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate the 
changes on the fly
     conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);  // for batch upsert
     conf.set(FlinkOptions.CDC_ENABLED, true);
@@ -1638,8 +1637,7 @@ public class ITTestHoodieDataSource {
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.TABLE_TYPE, tableType)
-        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
-        .option(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false)
+        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled)
         .option(FlinkOptions.READ_CDC_FROM_CHANGELOG, false)
         .option(FlinkOptions.READ_START_COMMIT, latestCommit)
         .option(FlinkOptions.CDC_ENABLED, true)
@@ -1662,6 +1660,58 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), 
"[-U[1], +U[2]]");
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void  testChangelogCompactionSchedule(Boolean compactionEnabled) throws 
Exception {
+    TableEnvironment tableEnv = streamTableEnv;
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled);
+    // schedule compaction after 2 commits
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2);
+    conf.set(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate the 
changes on the fly
+    conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);  // for batch upsert
+    conf.set(FlinkOptions.CDC_ENABLED, true);
+
+    // write 3 batches of the same data set
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+
+    String latestCommit = 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 2)
+        .option(FlinkOptions.READ_CDC_FROM_CHANGELOG, false)
+        .option(FlinkOptions.READ_START_COMMIT, latestCommit)
+        .option(FlinkOptions.CDC_ENABLED, true)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    String firstCommit = 
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
+    String secondCommit = TestUtils.getNthCompleteInstant(new 
StoragePath(tempFile.getAbsolutePath()), 1, HoodieTimeline.DELTA_COMMIT_ACTION);
+    String thirdCommit = 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+    final String query1 = String.format("select count(*) from t1/*+ 
options('read.start-commit'='%s')*/", firstCommit);
+    final String query2 = String.format("select count(*) from t1/*+ 
options('read.start-commit'='%s')*/", secondCommit);
+    final String query3 = String.format("select count(*) from t1/*+ 
options('read.start-commit'='%s')*/", thirdCommit);
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery(query1).execute().collect());
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery(query2).execute().collect());
+    List<Row> result3 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery(query3).execute().collect());
+    assertEquals(19, result1.size());
+    assertEquals(7, result2.size());
+    assertEquals(3, result3.size());
+    assertRowsEquals(result1.subList(result1.size() - 2, result1.size()), 
"[-U[1], +U[2]]");
+    assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), 
"[-D[1], +I[1]]");
+    assertRowsEquals(result3.subList(result3.size() - 2, result3.size()), 
"[-D[1], +I[1]]");
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws 
Exception {

Reply via email to