This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 87f9fdaee90 [HUDI-7911] Enable cdc log for MOR table (#11490)
87f9fdaee90 is described below

commit 87f9fdaee9047caeb5bb0628feabec58a5b2ed10
Author: Danny Chan <[email protected]>
AuthorDate: Tue Jul 9 08:28:43 2024 +0800

    [HUDI-7911] Enable cdc log for MOR table (#11490)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  18 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   2 +-
 .../apache/hudi/io/HoodieMergeHandleFactory.java   |   6 +
 .../hudi/common/model/WriteOperationType.java      |   2 +-
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  |  14 +-
 .../table/read/IncrementalQueryAnalyzer.java       |  40 ++++-
 .../apache/hudi/configuration/FlinkOptions.java    |  11 ++
 .../apache/hudi/configuration/OptionsResolver.java |  23 ++-
 .../apache/hudi/source/IncrementalInputSplits.java |   4 +-
 .../hudi/table/format/cdc/CdcInputFormat.java      | 188 ++++++++++++++++++++-
 .../table/format/mor/MergeOnReadInputFormat.java   |  21 ++-
 .../utils/BucketStreamWriteFunctionWrapper.java    |  11 ++
 .../sink/utils/StreamWriteFunctionWrapper.java     |  11 ++
 .../hudi/sink/utils/TestFunctionWrapper.java       |   8 +
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  40 +++++
 .../apache/hudi/table/format/TestInputFormat.java  |  63 ++++++-
 .../org/apache/hudi/utils/TestCompactionUtil.java  |   1 +
 .../test/java/org/apache/hudi/utils/TestData.java  |   2 +
 .../scala/org/apache/hudi/cdc/CDCRelation.scala    |   3 +-
 19 files changed, 435 insertions(+), 33 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ac728ec8de6..268a0c81fae 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1295,11 +1295,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     return 
ExecutorType.valueOf(getStringOrDefault(WRITE_EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
   }
 
-  public boolean isCDCEnabled() {
-    return getBooleanOrDefault(
-        HoodieTableConfig.CDC_ENABLED, 
HoodieTableConfig.CDC_ENABLED.defaultValue());
-  }
-
   public boolean isConsistentHashingEnabled() {
     return getIndexType() == HoodieIndex.IndexType.BUCKET && 
getBucketIndexEngineType() == 
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING;
   }
@@ -1309,6 +1304,19 @@ public class HoodieWriteConfig extends HoodieConfig {
         && 
HoodieIndex.BucketIndexEngineType.SIMPLE.equals(getBucketIndexEngineType());
   }
 
+  /**
+   * Returns whether the table writer would generate pure log files at the 
very first place.
+   */
+  public boolean isYieldingPureLogForMor() {
+    switch (getIndexType()) {
+      case BUCKET:
+      case FLINK_STATE:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   public boolean isConsistentLogicalTimestampEnabled() {
     return 
getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index b02fe871b27..30bcc767274 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -172,7 +172,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     String prevCommit = instantTime;
     String baseFile = "";
     List<String> logFiles = new ArrayList<>();
-    if (config.isCDCEnabled()) {
+    if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
       // the cdc reader needs the base file metadata to have deterministic 
update sequence.
       TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
       Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
index ac8a08c96ad..d049f884b51 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -88,6 +88,12 @@ public class HoodieMergeHandleFactory {
     if (table.requireSortedRecords()) {
       return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, 
keyToNewRecords, partitionPath, fileId,
           dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+    } else if (table.getMetaClient().getTableConfig().isCDCEnabled() && 
writeConfig.isYieldingPureLogForMor()) {
+      // IMPORTANT: only index type that yields pure log files need to enable 
the cdc log files for compaction,
+      // index type such as the BLOOM does not need this because it would do 
delta merge for inserts and generates log for updates,
+      // both of these two cases are already handled in HoodieCDCExtractor.
+      return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, 
table, keyToNewRecords, partitionPath, fileId,
+          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
     } else {
       return new HoodieMergeHandle<>(writeConfig, instantTime, table, 
keyToNewRecords, partitionPath, fileId,
           dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 96e00e6b955..268703710c5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -126,7 +126,7 @@ public enum WriteOperationType {
   /**
    * Whether the operation changes the dataset.
    */
-  public static boolean isDataChange(WriteOperationType operation) {
+  public static boolean yieldChanges(WriteOperationType operation) {
     return operation == WriteOperationType.INSERT
         || operation == WriteOperationType.UPSERT
         || operation == WriteOperationType.UPSERT_PREPPED
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index e38651336ad..28de790050a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -90,14 +90,18 @@ public class HoodieCDCExtractor {
 
   private HoodieTableFileSystemView fsView;
 
+  private final boolean consumeChangesFromCompaction;
+
   public HoodieCDCExtractor(
       HoodieTableMetaClient metaClient,
-      InstantRange range) {
+      InstantRange range,
+      boolean consumeChangesFromCompaction) {
     this.metaClient = metaClient;
     this.basePath = metaClient.getBasePath();
     this.storage = metaClient.getStorage();
     this.supplementalLoggingMode = 
metaClient.getTableConfig().cdcSupplementalLoggingMode();
     this.instantRange = range;
+    this.consumeChangesFromCompaction = consumeChangesFromCompaction;
     init();
   }
 
@@ -226,7 +230,8 @@ public class HoodieCDCExtractor {
             }
             return Pair.of(instant, commitMetadata);
           }).filter(pair ->
-              
WriteOperationType.isDataChange(pair.getRight().getOperationType())
+              
WriteOperationType.yieldChanges(pair.getRight().getOperationType())
+                  || (this.consumeChangesFromCompaction && 
pair.getRight().getOperationType() == WriteOperationType.COMPACT)
           ).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
     } catch (Exception e) {
       throw new HoodieIOException("Fail to get the commit metadata for CDC");
@@ -322,8 +327,9 @@ public class HoodieCDCExtractor {
       if (fileSliceOpt.isPresent()) {
         Pair<String, List<String>> fileSlice = fileSliceOpt.get();
         try {
-          HoodieBaseFile baseFile = new HoodieBaseFile(
-              storage.getPathInfo(new StoragePath(partitionPath, 
fileSlice.getLeft())));
+          HoodieBaseFile baseFile = fileSlice.getLeft().isEmpty()
+              ? null
+              : new HoodieBaseFile(storage.getPathInfo(new 
StoragePath(partitionPath, fileSlice.getLeft())));
           List<StoragePath> logFilePaths = fileSlice.getRight().stream()
               .filter(logFile -> !logFile.equals(currentLogFileName))
               .map(logFile -> new StoragePath(partitionPath, logFile))
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
index ca8ae575898..03c1f85f598 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -112,6 +112,7 @@ public class IncrementalQueryAnalyzer {
   private final boolean skipCompaction;
   private final boolean skipClustering;
   private final boolean skipInsertOverwrite;
+  private final boolean readCdcFromChangelog;
   private final int limit;
 
   private IncrementalQueryAnalyzer(
@@ -122,6 +123,7 @@ public class IncrementalQueryAnalyzer {
       boolean skipCompaction,
       boolean skipClustering,
       boolean skipInsertOverwrite,
+      boolean readCdcFromChangelog,
       int limit) {
     this.metaClient = metaClient;
     this.startTime = Option.ofNullable(startTime);
@@ -130,6 +132,7 @@ public class IncrementalQueryAnalyzer {
     this.skipCompaction = skipCompaction;
     this.skipClustering = skipClustering;
     this.skipInsertOverwrite = skipInsertOverwrite;
+    this.readCdcFromChangelog = readCdcFromChangelog;
     this.limit = limit;
   }
 
@@ -209,13 +212,23 @@ public class IncrementalQueryAnalyzer {
 
   private HoodieTimeline getFilteredTimeline(HoodieTableMetaClient metaClient) 
{
     HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
-    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
this.skipCompaction, this.skipClustering, this.skipInsertOverwrite);
+    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
this.skipCompaction, this.skipClustering, this.skipInsertOverwrite, 
this.readCdcFromChangelog);
   }
 
   private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
     HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
     HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
-    return filterInstantsAsPerUserConfigs(metaClient, 
archivedCompleteTimeline, this.skipCompaction, this.skipClustering, 
this.skipInsertOverwrite);
+    return filterInstantsAsPerUserConfigs(metaClient, 
archivedCompleteTimeline, this.skipCompaction, this.skipClustering, 
this.skipInsertOverwrite, this.readCdcFromChangelog);
+  }
+
+  @VisibleForTesting
+  public static HoodieTimeline filterInstantsAsPerUserConfigs(
+      HoodieTableMetaClient metaClient,
+      HoodieTimeline timeline,
+      boolean skipCompaction,
+      boolean skipClustering,
+      boolean skipInsertOverwrite) {
+    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
skipCompaction, skipClustering, skipInsertOverwrite, false);
   }
 
   /**
@@ -225,13 +238,22 @@ public class IncrementalQueryAnalyzer {
    *
    * @return the filtered timeline
    */
-  @VisibleForTesting
-  public static HoodieTimeline 
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline 
timeline, boolean skipCompaction, boolean skipClustering, boolean 
skipInsertOverwrite) {
+  private static HoodieTimeline filterInstantsAsPerUserConfigs(
+      HoodieTableMetaClient metaClient,
+      HoodieTimeline timeline,
+      boolean skipCompaction,
+      boolean skipClustering,
+      boolean skipInsertOverwrite,
+      boolean readCdcFromChangelog) {
     final HoodieTimeline oriTimeline = timeline;
-    if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ & 
skipCompaction) {
+    if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ && 
skipCompaction) {
       // the compaction commit uses 'commit' as action which is tricky
       timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
     }
+    if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ && 
metaClient.getTableConfig().isCDCEnabled() && readCdcFromChangelog) {
+      // only compaction yields changelog file
+      timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+    }
     if (skipClustering) {
       timeline = timeline.filter(instant -> 
!ClusteringUtils.isCompletedClusteringInstant(instant, oriTimeline));
     }
@@ -261,6 +283,7 @@ public class IncrementalQueryAnalyzer {
     private boolean skipCompaction = false;
     private boolean skipClustering = false;
     private boolean skipInsertOverwrite = false;
+    private boolean readCdcFromChangelog = false;
     /**
      * Maximum number of instants to read per run.
      */
@@ -304,6 +327,11 @@ public class IncrementalQueryAnalyzer {
       return this;
     }
 
+    public Builder readCdcFromChangelog(boolean readCdcFromChangelog) {
+      this.readCdcFromChangelog = readCdcFromChangelog;
+      return this;
+    }
+
     public Builder limit(int limit) {
       this.limit = limit;
       return this;
@@ -311,7 +339,7 @@ public class IncrementalQueryAnalyzer {
 
     public IncrementalQueryAnalyzer build() {
       return new 
IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient), 
this.startTime, this.endTime,
-          Objects.requireNonNull(this.rangeType), this.skipCompaction, 
this.skipClustering, this.skipInsertOverwrite, this.limit);
+          Objects.requireNonNull(this.rangeType), this.skipCompaction, 
this.skipClustering, this.skipInsertOverwrite, this.readCdcFromChangelog, 
this.limit);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 9a8b99a85c5..0a0f0612693 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -361,6 +361,17 @@ public class FlinkOptions extends HoodieConfig {
           + "the avg read instants number per-second would be 
'read.commits.limit'/'read.streaming.check-interval', by "
           + "default no limit");
 
+  @AdvancedConfig
+  public static final ConfigOption<Boolean> READ_CDC_FROM_CHANGELOG = 
ConfigOptions
+      .key("read.cdc.from.changelog")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Whether to consume the delta changes only from the cdc 
changelog files.\n"
+          + "When CDC is enabled, i). for COW table, the changelog is 
generated on each file update;\n"
+          + "ii). for MOR table, the changelog is generated on compaction.\n"
+          + "By default, always read from the changelog file,\n"
+          + "once it is disabled, the reader would infer the changes based on 
the file slice dependencies.");
+
   @AdvancedConfig
   public static final ConfigOption<Boolean> READ_DATA_SKIPPING_ENABLED = 
ConfigOptions
       .key("read.data.skipping.enabled")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index e2a1ab0c320..9d4e289cb20 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -194,7 +194,7 @@ public class OptionsResolver {
   /**
    * Returns whether the source should emit changelog.
    *
-   * @return true if the source is read as streaming with changelog mode 
enabled
+   * @return true if the source should emit changes.
    */
   public static boolean emitChangelog(Configuration conf) {
     return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && 
conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
@@ -202,6 +202,15 @@ public class OptionsResolver {
         || isIncrementalQuery(conf) && 
conf.getBoolean(FlinkOptions.CDC_ENABLED);
   }
 
+  /**
+   * Returns whether the source should emit deletes.
+   *
+   * @return true if the source is read as streaming with changelog mode 
enabled.
+   */
+  public static boolean emitDeletes(Configuration conf) {
+    return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && 
conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
+  }
+
   /**
    * Returns whether there is need to schedule the async compaction.
    *
@@ -354,6 +363,18 @@ public class OptionsResolver {
     return handlingMode == HollowCommitHandling.USE_TRANSITION_TIME;
   }
 
+  /**
+   * Returns whether to read the data changes only from changelog files. When 
CDC is enabled,
+   * i) for COW table, the changelog is generated on each file update;
+   * ii) for MOR table, the changelog is generated on compaction.
+   *
+   * <p>By default, always read from the changelog file,
+   * once it is disabled, the reader would infer the changes based on the file 
slice dependencies.
+   */
+  public static boolean readCDCFromChangelog(Configuration conf) {
+    return conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG);
+  }
+
   /**
    * Returns the index type.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 3ffed285584..f2c13a1165e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -141,6 +141,7 @@ public class IncrementalInputSplits implements Serializable 
{
         .skipCompaction(skipCompaction)
         .skipClustering(skipClustering)
         .skipInsertOverwrite(skipInsertOverwrite)
+        
.readCdcFromChangelog(this.conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG))
         .build();
 
     IncrementalQueryAnalyzer.QueryContext analyzingResult = analyzer.analyze();
@@ -251,6 +252,7 @@ public class IncrementalInputSplits implements Serializable 
{
         .skipCompaction(skipCompaction)
         .skipClustering(skipClustering)
         .skipInsertOverwrite(skipInsertOverwrite)
+        
.readCdcFromChangelog(this.conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG))
         .limit(OptionsResolver.getReadCommitsLimit(conf))
         .build();
 
@@ -383,7 +385,7 @@ public class IncrementalInputSplits implements Serializable 
{
   private List<MergeOnReadInputSplit> getCdcInputSplits(
       HoodieTableMetaClient metaClient,
       InstantRange instantRange) {
-    HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, 
instantRange);
+    HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, 
instantRange, OptionsResolver.readCDCFromChangelog(this.conf));
     Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = 
extractor.extractCDCFileSplits();
 
     if (fileSplits.isEmpty()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 8cd8be8b5cc..97d3278977d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -19,17 +19,27 @@
 package org.apache.hudi.table.format.cdc;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -48,11 +58,13 @@ import 
org.apache.hudi.table.format.mor.MergeOnReadTableState;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.RowDataProjection;
+import org.apache.hudi.util.RowDataToAvroConverters;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -62,6 +74,8 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -69,6 +83,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +99,7 @@ import static 
org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
  */
 public class CdcInputFormat extends MergeOnReadInputFormat {
   private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(CdcInputFormat.class);
 
   private CdcInputFormat(
       Configuration conf,
@@ -163,7 +179,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
     switch (fileSplit.getCdcInferCase()) {
       case BASE_FILE_INSERT:
         ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
-            "CDC file path should exist and be only one");
+            "CDC file path should exist and be singleton");
         String path = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
         return new AddBaseFileIterator(getBaseFileIterator(path));
       case BASE_FILE_DELETE:
@@ -185,6 +201,12 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
           default:
             throw new AssertionError("Unexpected mode" + mode);
         }
+      case LOG_FILE:
+        ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
+            "CDC file path should exist and be singleton");
+        String logFilepath = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
+        return new DataLogFileIterator(conf, hadoopConf, 
internalSchemaManager, maxCompactionMemoryInBytes, imageManager, fileSplit,
+            singleLogFile2Split(tablePath, logFilepath, 
maxCompactionMemoryInBytes), tableState);
       case REPLACE_COMMIT:
         return new ReplaceCommitIterator(conf, tablePath, tableState, 
fileSplit, this::getFileSliceIterator);
       default:
@@ -309,6 +331,135 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
     }
   }
 
+  // accounting to HoodieCDCInferenceCase.LOG_FILE
+  static class DataLogFileIterator implements ClosableIterator<RowData> {
+    private final Schema tableSchema;
+    private final long maxCompactionMemoryInBytes;
+    private final ImageManager imageManager;
+    private final HoodieMergedLogRecordScanner scanner;
+    private final Iterator<String> logRecordsKeyIterator;
+    private final RowDataProjection projection;
+    private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+    private final RowDataToAvroConverters.RowDataToAvroConverter 
rowDataToAvroConverter;
+    private final HoodieRecordMerger recordMerger;
+    private final TypedProperties payloadProps;
+
+    private ExternalSpillableMap<String, byte[]> beforeImages;
+    private RowData currentImage;
+    private RowData sideImage;
+
+    DataLogFileIterator(
+        Configuration flinkConf,
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        InternalSchemaManager schemaManager,
+        long maxCompactionMemoryInBytes,
+        ImageManager imageManager,
+        HoodieCDCFileSplit cdcFileSplit,
+        MergeOnReadInputSplit split,
+        MergeOnReadTableState tableState) throws IOException {
+      this.tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
+      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+      this.imageManager = imageManager;
+      this.scanner = FormatUtils.logScanner(split, tableSchema, 
schemaManager.getQuerySchema(), flinkConf, hadoopConf);
+      this.logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
+      this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(tableState.getRowType(), 
flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
+      this.rowDataToAvroConverter = 
RowDataToAvroConverters.createConverter(tableState.getRowType(), 
flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
+      this.projection = 
tableState.getRequiredRowType().equals(tableState.getRowType())
+          ? null
+          : RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
+
+      List<String> mergers = 
Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(","))
+          .map(String::trim)
+          .distinct()
+          .collect(Collectors.toList());
+      this.recordMerger = 
HoodieRecordUtils.createRecordMerger(split.getTablePath(), EngineType.FLINK, 
mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
+      this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
+      initImages(cdcFileSplit);
+    }
+
+    private void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
+      // init before images
+      if (fileSplit.getBeforeFileSlice().isPresent() && 
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
+        this.beforeImages = this.imageManager.getOrLoadImages(
+            maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+      } else {
+        // still initializes an empty map
+        this.beforeImages = 
FormatUtils.spillableMap(this.imageManager.writeConfig, 
maxCompactionMemoryInBytes);
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (this.sideImage != null) {
+        this.currentImage = this.sideImage;
+        this.sideImage = null;
+        return true;
+      }
+      while (logRecordsKeyIterator.hasNext()) {
+        String recordKey = logRecordsKeyIterator.next();
+        HoodieAvroRecord<?> record = (HoodieAvroRecord<?>) 
scanner.getRecords().get(recordKey);
+        Option<IndexedRecord> val = 
MergeOnReadInputFormat.getInsertVal(record, this.tableSchema);
+        RowData existed = 
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
+        if (val.isEmpty()) {
+          // it's a deleted record.
+          if (existed != null) {
+            // there is a real record deleted.
+            existed.setRowKind(RowKind.DELETE);
+            this.currentImage = existed;
+            return true;
+          }
+        } else {
+          IndexedRecord newAvroVal = val.get();
+          if (existed == null) {
+            // a new record is inserted.
+            RowData newRow = (RowData) 
avroToRowDataConverter.convert(newAvroVal);
+            newRow.setRowKind(RowKind.INSERT);
+            this.currentImage = newRow;
+            return true;
+          } else {
+            // an existed record is updated.
+            GenericRecord historyAvroRecord = (GenericRecord) 
rowDataToAvroConverter.convert(tableSchema, existed);
+            HoodieRecord<IndexedRecord> merged = 
mergeRowWithLog(historyAvroRecord, record).get();
+            if (merged.getData() != historyAvroRecord) {
+              // update happens
+              existed.setRowKind(RowKind.UPDATE_BEFORE);
+              this.currentImage = existed;
+
+              RowData mergedRow = (RowData) 
avroToRowDataConverter.convert(merged.getData());
+              mergedRow.setRowKind(RowKind.UPDATE_AFTER);
+              this.imageManager.updateImageRecord(record.getRecordKey(), 
beforeImages, mergedRow);
+              this.sideImage = mergedRow;
+
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return this.projection != null ? 
this.projection.project(this.currentImage) : this.currentImage;
+    }
+
+    @Override
+    public void close() {
+      this.scanner.close();
+      this.imageManager.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    private Option<HoodieRecord<IndexedRecord>> mergeRowWithLog(GenericRecord 
historyAvroRecord, HoodieRecord<?> newRecord) {
+      HoodieAvroIndexedRecord historyAvroIndexedRecord = new 
HoodieAvroIndexedRecord(historyAvroRecord);
+      try {
+        return recordMerger.merge(historyAvroIndexedRecord, tableSchema, 
newRecord, tableSchema, payloadProps).map(Pair::getLeft);
+      } catch (IOException e) {
+        throw new HoodieIOException("Merge base and delta payloads exception", 
e);
+      }
+    }
+  }
+
   abstract static class BaseImageIterator implements ClosableIterator<RowData> 
{
     private final Schema requiredSchema;
     private final int[] requiredPos;
@@ -673,6 +824,33 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
       }
     }
 
+    public void updateImageRecord(
+        String recordKey,
+        ExternalSpillableMap<String, byte[]> cache,
+        RowData row) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+      try {
+        serializer.serialize(row, new BytesArrayOutputView(baos));
+      } catch (IOException e) {
+        throw new HoodieException("Serialize row data into bytes exception", 
e);
+      }
+      cache.put(recordKey, baos.toByteArray());
+    }
+
+    public RowData removeImageRecord(
+        String recordKey,
+        ExternalSpillableMap<String, byte[]> cache) {
+      byte[] bytes = cache.remove(recordKey);
+      if (bytes == null) {
+        return null;
+      }
+      try {
+        return serializer.deserialize(new BytesArrayInputView(bytes));
+      } catch (IOException e) {
+        throw new HoodieException("Deserialize bytes into row data exception", 
e);
+      }
+    }
+
     @Override
     public void close() {
       this.cache.values().forEach(ExternalSpillableMap::close);
@@ -741,7 +919,13 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
         .collect(Collectors.toList()));
     String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
     return new MergeOnReadInputSplit(0, basePath, logPaths,
-        fileSlice.getBaseInstantTime(), tablePath, maxCompactionMemoryInBytes,
+        fileSlice.getLatestInstantTime(), tablePath, 
maxCompactionMemoryInBytes,
         FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, fileSlice.getFileId());
   }
+
+  public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, 
String filePath, long maxCompactionMemoryInBytes) {
+    return new MergeOnReadInputSplit(0, null, 
Option.of(Collections.singletonList(filePath)),
+        FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), 
tablePath, maxCompactionMemoryInBytes,
+        FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, 
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 9946eae930c..66b44576f80 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -149,7 +149,7 @@ public class MergeOnReadInputFormat
    */
   private boolean closed = true;
 
-  private final InternalSchemaManager internalSchemaManager;
+  protected final InternalSchemaManager internalSchemaManager;
 
   protected MergeOnReadInputFormat(
       Configuration conf,
@@ -205,7 +205,7 @@ public class MergeOnReadInputFormat
       }
     } else if (!split.getBasePath().isPresent()) {
       // log files only
-      if (OptionsResolver.emitChangelog(conf)) {
+      if (OptionsResolver.emitDeletes(conf)) {
         return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
       } else {
         return new LogFileOnlyIterator(getLogFileIterator(split));
@@ -312,7 +312,7 @@ public class MergeOnReadInputFormat
     try {
       return getBaseFileIterator(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
     } catch (IOException e) {
-      throw new HoodieException("Get reader error for path: " + path);
+      throw new HoodieException("Get reader error for path: " + path, e);
     }
   }
 
@@ -441,13 +441,8 @@ public class MergeOnReadInputFormat
       @Override
       public boolean hasNext() {
         while (recordsIterator.hasNext()) {
-          Option<IndexedRecord> curAvroRecord = null;
           final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord) 
recordsIterator.next();
-          try {
-            curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
-          } catch (IOException e) {
-            throw new HoodieException("Get avro insert value error for key: " 
+ hoodieRecord.getRecordKey(), e);
-          }
+          Option<IndexedRecord> curAvroRecord = getInsertVal(hoodieRecord, 
tableSchema);
           if (curAvroRecord.isPresent()) {
             final IndexedRecord avroRecord = curAvroRecord.get();
             GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
@@ -475,6 +470,14 @@ public class MergeOnReadInputFormat
     };
   }
 
+  protected static Option<IndexedRecord> getInsertVal(HoodieAvroRecord<?> 
hoodieRecord, Schema tableSchema) {
+    try {
+      return hoodieRecord.getData().getInsertValue(tableSchema);
+    } catch (IOException e) {
+      throw new HoodieException("Get avro insert value error for key: " + 
hoodieRecord.getRecordKey(), e);
+    }
+  }
+
   protected ClosableIterator<RowData> 
getFullLogFileIterator(MergeOnReadInputSplit split) {
     final Schema tableSchema = new 
Schema.Parser().parse(tableState.getAvroSchema());
     final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index df648df9ac0..319ff170ca3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -169,6 +169,17 @@ public class BucketStreamWriteFunctionWrapper<I> 
implements TestFunctionWrapper<
     }
   }
 
+  @Override
+  public void inlineCompaction() {
+    if (asyncCompaction) {
+      try {
+        compactFunctionWrapper.compact(1); // always uses a constant 
checkpoint ID.
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    }
+  }
+
   public void close() throws Exception {
     coordinator.close();
     ioManager.close();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index c65e42f1521..fff916bbcc1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -216,6 +216,17 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     }
   }
 
+  @Override
+  public void inlineCompaction() {
+    if (asyncCompaction) {
+      try {
+        compactFunctionWrapper.compact(1); // always uses a constant 
checkpoint ID.
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    }
+  }
+
   public void jobFailover() throws Exception {
     coordinatorFails();
     subTaskFails(0, 0);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index faee168bf25..860b3aee9c0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -68,6 +68,14 @@ public interface TestFunctionWrapper<I> {
    */
   void checkpointComplete(long checkpointId);
 
+  /**
+   * Keep this interface for batch inline compaction job. The batch pipeline 
triggers the commit of Hudi table
+   * with "endInput" events in the coordinator whereas there is no good chance 
to plug in the compaction sub-pipeline.
+   */
+  default void inlineCompaction() {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Triggers the job failover, including the coordinator and the write tasks.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index d9f3badf1fb..7cd1f4b3e7b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1459,6 +1459,46 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), 
"[-U[1], +U[2]]");
   }
 
+  @Test
+  void testReadChangelogIncrementalMor() throws Exception {
+    TableEnvironment tableEnv = streamTableEnv;
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    conf.setBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate 
the changes on the fly
+    conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);  // for batch 
upsert
+    conf.setBoolean(FlinkOptions.CDC_ENABLED, true);
+
+    // write 3 batches of the same data set
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+
+    String latestCommit = 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.READ_START_COMMIT, latestCommit)
+        .option(FlinkOptions.CDC_ENABLED, true)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.dataSetUpsert(2, 1));
+
+    // write another 10 batches of dataset
+    for (int i = 0; i < 10; i++) {
+      TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+    }
+
+    String firstCommit = 
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
+    final String query = String.format("select count(*) from t1/*+ 
options('read.start-commit'='%s')*/", firstCommit);
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery(query).execute().collect());
+    assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), 
"[-U[1], +U[2]]");
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws 
Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 235d2421c38..37c2c704a1c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -101,7 +101,9 @@ public class TestInputFormat {
   void beforeEach(HoodieTableType tableType, Map<String, String> options) 
throws IOException {
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
-    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close 
the async compaction
+    if (!conf.contains(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
+      conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by 
default close the async compaction
+    }
     options.forEach((key, value) -> conf.setString(key, value));
 
     StreamerUtil.initTableIfNotExists(conf);
@@ -875,6 +877,64 @@ public class TestInputFormat {
 
     assertThat(commits.size(), is(3));
 
+    testReadChangelogInternal(commits);
+  }
+
+  @Test
+  void testReadChangelogIncrementallyForMor() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.QUERY_TYPE.key(), 
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+    options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");  // for 
batch update
+    options.put(FlinkOptions.READ_CDC_FROM_CHANGELOG.key(), "false"); // 
infers the data changes on the fly
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write 3 commits first
+    // write the same dataset 3 times to generate changelog
+    for (int i = 0; i < 3; i++) {
+      List<RowData> dataset = TestData.dataSetInsert(1, 2);
+      TestData.writeDataAsBatch(dataset, conf);
+    }
+
+    HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
+        new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), 
tempFile.getAbsolutePath());
+    List<String> commits = 
metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
+        .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
+
+    assertThat(commits.size(), is(3));
+
+    testReadChangelogInternal(commits);
+  }
+
+  @Test
+  void testReadChangelogIncrementallyForMorWithCompaction() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.QUERY_TYPE.key(), 
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+    options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "true");
+    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");   // 
compact for every commit
+    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); // for 
batch update
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write 3 commits first
+    // write the same dataset 3 times to generate changelog
+    for (int i = 0; i < 3; i++) {
+      List<RowData> dataset = TestData.dataSetInsert(1, 2);
+      TestData.writeDataAsBatch(dataset, conf);
+    }
+
+    HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
+        new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), 
tempFile.getAbsolutePath());
+    List<String> commits = 
metaClient.getCommitsTimeline().filterCompletedInstants()
+        .filter(instant -> 
instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)).getInstantsAsStream()
+        .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
+
+    assertThat(commits.size(), is(3));
+
+    testReadChangelogInternal(commits);
+  }
+
+  private void testReadChangelogInternal(List<String> commits) throws 
IOException {
     // only the start commit
     conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(1));
     this.tableSource = getTableSource(conf);
@@ -884,7 +944,6 @@ public class TestInputFormat {
     List<RowData> actual1 = readData(inputFormat1);
     final List<RowData> expected1 = TestData.dataSetUpsert(2, 1, 2, 1);
     TestData.assertRowDataEquals(actual1, expected1);
-
     // only the start commit: earliest
     conf.setString(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
     this.tableSource = getTableSource(conf);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 932d9972bf9..3845b6150e5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -132,6 +132,7 @@ public class TestCompactionUtil {
   void testScheduleCompaction() throws Exception {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), 
FlinkOptions.TIME_ELAPSED);
     options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0");
     beforeEach(options);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7b14614cd37..3dd5d183049 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -560,6 +560,8 @@ public class TestData {
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
 
+    funcWrapper.inlineCompaction();
+
     funcWrapper.close();
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index a951a60698d..eb91c255c22 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -80,7 +80,8 @@ class CDCRelation(
         .startInstant(startInstant)
         .endInstant(endInstant)
         .nullableBoundary(true)
-        .rangeType(InstantRange.RangeType.OPEN_CLOSED).build())
+        .rangeType(InstantRange.RangeType.OPEN_CLOSED).build(),
+      false)
 
   override final def needConversion: Boolean = false
 

Reply via email to