This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f3d4ce919d [HUDI-4916] Implement change log feed for Flink (#6840)
f3d4ce919d is described below

commit f3d4ce919d4909f9533255ee2a9a0450c8e44c73
Author: Danny Chan <[email protected]>
AuthorDate: Sat Oct 1 18:21:23 2022 +0800

    [HUDI-4916] Implement change log feed for Flink (#6840)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   5 +
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   |   2 +-
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |   8 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  50 +-
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |   6 -
 .../FlinkMergeAndReplaceHandleWithChangeLog.java}  |  51 +-
 .../hudi/io/FlinkMergeHandleWithChangeLog.java}    |  52 +-
 .../apache/hudi/io/FlinkWriteHandleFactory.java    | 283 ++++++++
 .../java/org/apache/hudi/io/MiniBatchHandle.java   |   8 -
 .../hudi/common/model/WriteOperationType.java      |  14 +
 .../hudi/common/table/HoodieTableConfig.java       |   4 +-
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  | 100 ++-
 .../hudi/common/table/cdc/HoodieCDCFileSplit.java  |  31 +-
 .../cdc/HoodieCDCSupplementalLoggingMode.java      |   4 +-
 .../table/log/HoodieCDCLogRecordIterator.java      |  14 +-
 .../table/log/HoodieUnMergedLogRecordScanner.java  |   6 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  22 +
 .../apache/hudi/configuration/OptionsResolver.java |   9 +
 .../apache/hudi/source/IncrementalInputSplits.java | 114 +++-
 .../hudi/source/StreamReadMonitoringFunction.java  |  11 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |  34 +-
 .../org/apache/hudi/table/format/FormatUtils.java  |  20 +
 .../hudi/table/format/cdc/CdcInputFormat.java      | 723 +++++++++++++++++++++
 .../hudi/table/format/cdc/CdcInputSplit.java       |  61 ++
 .../table/format/mor/MergeOnReadInputFormat.java   | 166 +++--
 .../table/format/mor/MergeOnReadInputSplit.java    |   2 +-
 .../org/apache/hudi/util/RowDataProjection.java    |   1 +
 .../java/org/apache/hudi/util/StreamerUtil.java    |   4 +
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  36 +
 .../apache/hudi/table/format/TestInputFormat.java  |  95 ++-
 .../test/java/org/apache/hudi/utils/TestData.java  |  30 +
 .../scala/org/apache/hudi/cdc/CDCRelation.scala    |  15 +-
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   |  32 +-
 33 files changed, 1732 insertions(+), 281 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index fd7972a719..3b3995c8f8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -975,6 +975,11 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(KEYGENERATOR_CLASS_NAME);
   }
 
+  public boolean isCDCEnabled() {
+    return getBooleanOrDefault(
+        HoodieTableConfig.CDC_ENABLED, 
HoodieTableConfig.CDC_ENABLED.defaultValue());
+  }
+
   public boolean isConsistentLogicalTimestampEnabled() {
     return 
getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index f57b195c76..303eea76db 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -95,7 +95,7 @@ public class HoodieCDCLogger implements Closeable {
           ? HoodieRecord.RECORD_KEY_METADATA_FIELD
           : tableConfig.getRecordKeyFieldProp();
       this.cdcWriter = cdcWriter;
-      this.cdcSupplementalLoggingMode = config.getCDCSupplementalLoggingMode();
+      this.cdcSupplementalLoggingMode = 
tableConfig.cdcSupplementalLoggingMode();
       this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
           cdcSupplementalLoggingMode,
           dataSchema
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 12e48ffbb4..910bc42158 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
@@ -75,14 +76,17 @@ public class HoodieMergeHandleWithChangeLog<T extends 
HoodieRecordPayload, I, K,
   protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
     final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
indexedRecord);
     if (result) {
-      cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
+      boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : 
indexedRecord);
     }
     return result;
   }
 
   protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> insertRecord) {
     super.writeInsertRecord(hoodieRecord, insertRecord);
-    cdcLogger.put(hoodieRecord, null, insertRecord);
+    if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
+      cdcLogger.put(hoodieRecord, null, insertRecord);
+    }
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 53a5799508..191eb003b9 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -27,10 +27,8 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -46,12 +44,7 @@ import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.io.FlinkAppendHandle;
-import org.apache.hudi.io.FlinkConcatAndReplaceHandle;
-import org.apache.hudi.io.FlinkConcatHandle;
-import org.apache.hudi.io.FlinkCreateHandle;
-import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
-import org.apache.hudi.io.FlinkMergeHandle;
+import org.apache.hudi.io.FlinkWriteHandleFactory;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.MiniBatchHandle;
 import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
@@ -566,41 +559,12 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
       String instantTime,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
       Iterator<HoodieRecord<T>> recordItr) {
-    final HoodieRecordLocation loc = record.getCurrentLocation();
-    final String fileID = loc.getFileId();
-    final String partitionPath = record.getPartitionPath();
-    final boolean insertClustering = config.allowDuplicateInserts();
-
-    if (bucketToHandles.containsKey(fileID)) {
-      MiniBatchHandle lastHandle = (MiniBatchHandle) 
bucketToHandles.get(fileID);
-      if (lastHandle.shouldReplace()) {
-        HoodieWriteHandle<?, ?, ?, ?> writeHandle = insertClustering
-            ? new FlinkConcatAndReplaceHandle<>(config, instantTime, table, 
recordItr, partitionPath, fileID,
-                table.getTaskContextSupplier(), lastHandle.getWritePath())
-            : new FlinkMergeAndReplaceHandle<>(config, instantTime, table, 
recordItr, partitionPath, fileID,
-                table.getTaskContextSupplier(), lastHandle.getWritePath());
-        this.bucketToHandles.put(fileID, writeHandle); // override with new 
replace handle
-        return writeHandle;
-      }
-    }
-
-    final boolean isDelta = 
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
-    final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
-    if (isDelta) {
-      writeHandle = new FlinkAppendHandle<>(config, instantTime, table, 
partitionPath, fileID, recordItr,
-          table.getTaskContextSupplier());
-    } else if (loc.getInstantTime().equals("I")) {
-      writeHandle = new FlinkCreateHandle<>(config, instantTime, table, 
partitionPath,
-          fileID, table.getTaskContextSupplier());
-    } else {
-      writeHandle = insertClustering
-          ? new FlinkConcatHandle<>(config, instantTime, table, recordItr, 
partitionPath,
-              fileID, table.getTaskContextSupplier())
-          : new FlinkMergeHandle<>(config, instantTime, table, recordItr, 
partitionPath,
-              fileID, table.getTaskContextSupplier());
-    }
-    this.bucketToHandles.put(fileID, writeHandle);
-    return writeHandle;
+    // caution: it's not a good practice to modify the handles internal.
+    FlinkWriteHandleFactory.Factory<T,
+        List<HoodieRecord<T>>,
+        List<HoodieKey>,
+        List<WriteStatus>> writeHandleFactory = 
FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(), 
config);
+    return writeHandleFactory.create(this.bucketToHandles, record, config, 
instantTime, table, recordItr);
   }
 
   public HoodieFlinkTable<T> getHoodieTable() {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index b514896aa1..2258375fdd 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -120,10 +120,4 @@ public class FlinkAppendHandle<T extends 
HoodieRecordPayload, I, K, O>
   public Path getWritePath() {
     return writer.getLogFile().getPath();
   }
-
-  @Override
-  public boolean shouldReplace() {
-    // log files can append new data buffer directly
-    return false;
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
similarity index 56%
copy from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
copy to 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 12e48ffbb4..62b5481cf9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,49 +20,36 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 /**
- * A merge handle that supports logging change logs.
+ * A flink merge and replace handle that supports logging change logs.
+ *
+ * <p>The cdc about logic is copied from {@link 
HoodieMergeHandleWithChangeLog},
+ * we should refactor it out when there are good abstractions.
  */
-public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, 
K, O> extends HoodieMergeHandle<T, I, K, O> {
-  protected final HoodieCDCLogger cdcLogger;
+public class FlinkMergeAndReplaceHandleWithChangeLog<T extends 
HoodieRecordPayload, I, K, O>
+    extends FlinkMergeAndReplaceHandle<T, I, K, O> {
+  private final HoodieCDCLogger cdcLogger;
 
-  public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
-                                        Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
-                                        TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
-    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
-    this.cdcLogger = new HoodieCDCLogger(
-        instantTime,
-        config,
-        hoodieTable.getMetaClient().getTableConfig(),
-        tableSchema,
-        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
-        IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
-  }
-
-  /**
-   * Called by compactor code path.
-   */
-  public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
-                                        Map<String, HoodieRecord<T>> 
keyToNewRecords, String partitionPath, String fileId,
-                                        HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
-    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+  public FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, 
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                                 Iterator<HoodieRecord<T>> 
recordItr, String partitionPath, String fileId,
+                                                 TaskContextSupplier 
taskContextSupplier, Path basePath) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, basePath);
     this.cdcLogger = new HoodieCDCLogger(
         instantTime,
         config,
@@ -75,22 +62,24 @@ public class HoodieMergeHandleWithChangeLog<T extends 
HoodieRecordPayload, I, K,
   protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
     final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
indexedRecord);
     if (result) {
-      cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
+      boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : 
indexedRecord);
     }
     return result;
   }
 
   protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> insertRecord) {
     super.writeInsertRecord(hoodieRecord, insertRecord);
-    cdcLogger.put(hoodieRecord, null, insertRecord);
+    if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
+      cdcLogger.put(hoodieRecord, null, insertRecord);
+    }
   }
 
   @Override
   public List<WriteStatus> close() {
     List<WriteStatus> writeStatuses = super.close();
     // if there are cdc data written, set the CDC-related information.
-    Option<AppendResult> cdcResult =
-        HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, 
insertRecordsWritten);
+    Option<AppendResult> cdcResult = cdcLogger.writeCDCData();
     HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), 
cdcResult, partitionPath, fs);
     return writeStatuses;
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
similarity index 56%
copy from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
copy to 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 12e48ffbb4..f6adbbf0d4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,49 +20,39 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 /**
- * A merge handle that supports logging change logs.
+ * A flink merge handle that supports logging change logs.
+ *
+ * <p>The cdc about logic is copied from {@link 
HoodieMergeHandleWithChangeLog},
+ * we should refactor it out when there are good abstractions.
  */
-public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, 
K, O> extends HoodieMergeHandle<T, I, K, O> {
-  protected final HoodieCDCLogger cdcLogger;
+public class FlinkMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, 
K, O>
+    extends FlinkMergeHandle<T, I, K, O> {
+  private final HoodieCDCLogger cdcLogger;
 
-  public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
-                                        Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
-                                        TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
-    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
-    this.cdcLogger = new HoodieCDCLogger(
-        instantTime,
-        config,
-        hoodieTable.getMetaClient().getTableConfig(),
-        tableSchema,
-        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
-        IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
-  }
+  private static final Logger LOG = 
LogManager.getLogger(FlinkMergeHandleWithChangeLog.class);
 
-  /**
-   * Called by compactor code path.
-   */
-  public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
-                                        Map<String, HoodieRecord<T>> 
keyToNewRecords, String partitionPath, String fileId,
-                                        HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
-    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+  public FlinkMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                       Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
+                                       TaskContextSupplier 
taskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier);
     this.cdcLogger = new HoodieCDCLogger(
         instantTime,
         config,
@@ -75,22 +65,24 @@ public class HoodieMergeHandleWithChangeLog<T extends 
HoodieRecordPayload, I, K,
   protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
     final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
indexedRecord);
     if (result) {
-      cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
+      boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : 
indexedRecord);
     }
     return result;
   }
 
   protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> insertRecord) {
     super.writeInsertRecord(hoodieRecord, insertRecord);
-    cdcLogger.put(hoodieRecord, null, insertRecord);
+    if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
+      cdcLogger.put(hoodieRecord, null, insertRecord);
+    }
   }
 
   @Override
   public List<WriteStatus> close() {
     List<WriteStatus> writeStatuses = super.close();
     // if there are cdc data written, set the CDC-related information.
-    Option<AppendResult> cdcResult =
-        HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, 
insertRecordsWritten);
+    Option<AppendResult> cdcResult = cdcLogger.writeCDCData();
     HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), 
cdcResult, partitionPath, fs);
     return writeStatuses;
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
new file mode 100644
index 0000000000..fbc1c7ec55
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Factory clazz for flink write handles.
+ */
+public class FlinkWriteHandleFactory {
+
+  /**
+   * Returns the write handle factory with given write config.
+   */
+  public static <T extends HoodieRecordPayload, I, K, O> Factory<T, I, K, O> 
getFactory(
+      HoodieTableConfig tableConfig,
+      HoodieWriteConfig writeConfig) {
+    if (writeConfig.allowDuplicateInserts()) {
+      return ClusterWriteHandleFactory.getInstance();
+    }
+    if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
+      return DeltaCommitWriteHandleFactory.getInstance();
+    } else if (tableConfig.isCDCEnabled()) {
+      return CdcWriteHandleFactory.getInstance();
+    } else {
+      return CommitWriteHandleFactory.getInstance();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  public interface Factory<T extends HoodieRecordPayload, I, K, O> {
+    /**
+     * Get or create a new write handle in order to reuse the file handles.
+     *
+     * <p>CAUTION: the method is not thread safe.
+     *
+     * @param bucketToHandles The existing write handles
+     * @param record          The first record in the bucket
+     * @param config          Write config
+     * @param instantTime     The instant time
+     * @param table           The table
+     * @param recordItr       Record iterator
+     *
+     * @return Existing write handle or create a new one
+     */
+    HoodieWriteHandle<?, ?, ?, ?> create(
+        final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles,
+        HoodieRecord<T> record,
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr
+    );
+  }
+
+  /**
+   * Base clazz for commit write handle factory,
+   * it encapsulates the handle switching logic: INSERT OR UPSERT.
+   */
+  private abstract static class BaseCommitWriteHandleFactory<T extends 
HoodieRecordPayload, I, K, O> implements Factory<T, I, K, O> {
+    @Override
+    public HoodieWriteHandle<?, ?, ?, ?> create(
+        Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles,
+        HoodieRecord<T> record,
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr) {
+      final HoodieRecordLocation loc = record.getCurrentLocation();
+      final String fileID = loc.getFileId();
+      final String partitionPath = record.getPartitionPath();
+
+      if (bucketToHandles.containsKey(fileID)) {
+        MiniBatchHandle lastHandle = (MiniBatchHandle) 
bucketToHandles.get(fileID);
+        HoodieWriteHandle<?, ?, ?, ?> writeHandle =
+            createReplaceHandle(config, instantTime, table, recordItr, 
partitionPath, fileID, lastHandle.getWritePath());
+        bucketToHandles.put(fileID, writeHandle); // override with new replace 
handle
+        return writeHandle;
+      }
+
+      final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
+      if (loc.getInstantTime().equals("I")) {
+        writeHandle = new FlinkCreateHandle<>(config, instantTime, table, 
partitionPath,
+            fileID, table.getTaskContextSupplier());
+      } else {
+        writeHandle = createMergeHandle(config, instantTime, table, recordItr, 
partitionPath, fileID);
+      }
+      bucketToHandles.put(fileID, writeHandle);
+      return writeHandle;
+    }
+
+    protected abstract HoodieWriteHandle<?, ?, ?, ?> createReplaceHandle(
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr,
+        String partitionPath,
+        String fileId,
+        Path basePath);
+
+    protected abstract HoodieWriteHandle<?, ?, ?, ?> createMergeHandle(
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr,
+        String partitionPath,
+        String fileId);
+  }
+
+  /**
+   * Write handle factory for commit.
+   */
+  private static class CommitWriteHandleFactory<T extends HoodieRecordPayload, 
I, K, O>
+      extends BaseCommitWriteHandleFactory<T, I, K, O> {
+    private static final CommitWriteHandleFactory<?, ?, ?, ?> INSTANCE = new 
CommitWriteHandleFactory<>();
+
+    @SuppressWarnings("unchecked")
+    public static <T extends HoodieRecordPayload, I, K, O> 
CommitWriteHandleFactory<T, I, K, O> getInstance() {
+      return (CommitWriteHandleFactory<T, I, K, O>) INSTANCE;
+    }
+
+    @Override
+    protected HoodieWriteHandle<?, ?, ?, ?> createReplaceHandle(
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr,
+        String partitionPath,
+        String fileId,
+        Path basePath) {
+      return new FlinkMergeAndReplaceHandle<>(config, instantTime, table, 
recordItr, partitionPath, fileId,
+          table.getTaskContextSupplier(), basePath);
+    }
+
+    @Override
+    protected HoodieWriteHandle<?, ?, ?, ?> createMergeHandle(
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr,
+        String partitionPath,
+        String fileId) {
+      return new FlinkMergeHandle<>(config, instantTime, table, recordItr, 
partitionPath,
+          fileId, table.getTaskContextSupplier());
+    }
+  }
+
+  /**
+   * Write handle factory for inline clustering.
+   */
+  private static class ClusterWriteHandleFactory<T extends 
HoodieRecordPayload, I, K, O>
+      extends BaseCommitWriteHandleFactory<T, I, K, O> {
+    private static final ClusterWriteHandleFactory<?, ?, ?, ?> INSTANCE = new 
ClusterWriteHandleFactory<>();
+
+    @SuppressWarnings("unchecked")
+    public static <T extends HoodieRecordPayload, I, K, O> 
ClusterWriteHandleFactory<T, I, K, O> getInstance() {
+      return (ClusterWriteHandleFactory<T, I, K, O>) INSTANCE;
+    }
+
+    @Override
+    protected HoodieWriteHandle<?, ?, ?, ?> createReplaceHandle(
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr,
+        String partitionPath,
+        String fileId,
+        Path basePath) {
+      return new FlinkConcatAndReplaceHandle<>(config, instantTime, table, 
recordItr, partitionPath, fileId,
+          table.getTaskContextSupplier(), basePath);
+    }
+
+    @Override
+    protected HoodieWriteHandle<?, ?, ?, ?> createMergeHandle(
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr,
+        String partitionPath,
+        String fileId) {
+      return new FlinkConcatHandle<>(config, instantTime, table, recordItr, 
partitionPath,
+          fileId, table.getTaskContextSupplier());
+    }
+  }
+
+  /**
+   * Write handle factory for commit, the write handle supports logging change 
logs.
+   */
+  private static class CdcWriteHandleFactory<T extends HoodieRecordPayload, I, 
K, O>
+      extends BaseCommitWriteHandleFactory<T, I, K, O> {
+    private static final CdcWriteHandleFactory<?, ?, ?, ?> INSTANCE = new 
CdcWriteHandleFactory<>();
+
+    @SuppressWarnings("unchecked")
+    public static <T extends HoodieRecordPayload, I, K, O> 
CdcWriteHandleFactory<T, I, K, O> getInstance() {
+      return (CdcWriteHandleFactory<T, I, K, O>) INSTANCE;
+    }
+
+    @Override
+    protected HoodieWriteHandle<?, ?, ?, ?> createReplaceHandle(
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr,
+        String partitionPath,
+        String fileId,
+        Path basePath) {
+      return new FlinkMergeAndReplaceHandleWithChangeLog<>(config, 
instantTime, table, recordItr, partitionPath, fileId,
+          table.getTaskContextSupplier(), basePath);
+    }
+
+    @Override
+    protected HoodieWriteHandle<?, ?, ?, ?> createMergeHandle(
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr,
+        String partitionPath,
+        String fileId) {
+      return new FlinkMergeHandleWithChangeLog<>(config, instantTime, table, 
recordItr, partitionPath,
+          fileId, table.getTaskContextSupplier());
+    }
+  }
+
+  /**
+   * Write handle factory for delta commit.
+   */
+  private static class DeltaCommitWriteHandleFactory<T extends 
HoodieRecordPayload, I, K, O> implements Factory<T, I, K, O> {
+    private static final DeltaCommitWriteHandleFactory<?, ?, ?, ?> INSTANCE = 
new DeltaCommitWriteHandleFactory<>();
+
+    @SuppressWarnings("unchecked")
+    public static <T extends HoodieRecordPayload, I, K, O> 
DeltaCommitWriteHandleFactory<T, I, K, O> getInstance() {
+      return (DeltaCommitWriteHandleFactory<T, I, K, O>) INSTANCE;
+    }
+
+    @Override
+    public HoodieWriteHandle<?, ?, ?, ?> create(
+        Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles,
+        HoodieRecord<T> record,
+        HoodieWriteConfig config,
+        String instantTime,
+        HoodieTable<T, I, K, O> table,
+        Iterator<HoodieRecord<T>> recordItr) {
+      final String fileID = record.getCurrentLocation().getFileId();
+      final String partitionPath = record.getPartitionPath();
+
+      final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
+          new FlinkAppendHandle<>(config, instantTime, table, partitionPath, 
fileID, recordItr,
+              table.getTaskContextSupplier());
+      bucketToHandles.put(fileID, writeHandle);
+      return writeHandle;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
index 7d3d7fa5ff..91b8f6630c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
@@ -43,12 +43,4 @@ public interface MiniBatchHandle {
    * Returns the write file path.
    */
   Path getWritePath();
-
-  /**
-   * Whether the old write file should be replaced with the same name new file
-   * using content merged with incremental new data batch.
-   */
-  default boolean shouldReplace() {
-    return true;
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index f2f3809cf5..e0574d5119 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -118,4 +118,18 @@ public enum WriteOperationType {
   public static boolean isOverwrite(WriteOperationType operationType) {
     return operationType == INSERT_OVERWRITE || operationType == 
INSERT_OVERWRITE_TABLE;
   }
+
+  /**
+   * Whether the operation changes the dataset.
+   */
+  public static boolean isDataChange(WriteOperationType operation) {
+    return operation == WriteOperationType.INSERT
+        || operation == WriteOperationType.UPSERT
+        || operation == WriteOperationType.DELETE
+        || operation == WriteOperationType.BULK_INSERT
+        || operation == WriteOperationType.DELETE_PARTITION
+        || operation == WriteOperationType.INSERT_OVERWRITE
+        || operation == WriteOperationType.INSERT_OVERWRITE_TABLE
+        || operation == WriteOperationType.BOOTSTRAP;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 56862be803..f295b0019c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -614,8 +614,8 @@ public class HoodieTableConfig extends HoodieConfig {
     return getBooleanOrDefault(CDC_ENABLED);
   }
 
-  public String cdcSupplementalLoggingMode() {
-    return getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE);
+  public HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode() {
+    return 
HoodieCDCSupplementalLoggingMode.parse(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE));
   }
 
   public String getKeyGeneratorClassName() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 8cae90d59a..a9da1be77a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -28,12 +28,14 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -45,6 +47,7 @@ import org.apache.hadoop.fs.Path;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Locale;
@@ -60,17 +63,16 @@ import static 
org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_DELE
 import static 
org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
-import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 
 /**
  * This class helps to extract all the information which will be used when CDC 
query.
  *
  * There are some steps:
- * 1. filter out the completed commit instants, and get the related 
[[HoodieCommitMetadata]] objects.
- * 2. initialize the [[HoodieTableFileSystemView]] by the touched data files.
+ * 1. filter out the completed commit instants, and get the related {@link 
HoodieCommitMetadata} objects.
+ * 2. initialize the {@link HoodieTableFileSystemView} by the touched data 
files.
  * 3. extract the cdc information:
- *   generate a [[CDCFileSplit]] object for each of the instant in 
(startInstant, endInstant)
+ *   generate a {@link HoodieCDCFileSplit} object for each of the instant in 
(startInstant, endInstant)
  *   and each of the file group which is touched in the range of instants.
  */
 public class HoodieCDCExtractor {
@@ -83,9 +85,7 @@ public class HoodieCDCExtractor {
 
   private final HoodieCDCSupplementalLoggingMode supplementalLoggingMode;
 
-  private final String startInstant;
-
-  private final String endInstant;
+  private final InstantRange instantRange;
 
   private Map<HoodieInstant, HoodieCommitMetadata> commits;
 
@@ -93,33 +93,27 @@ public class HoodieCDCExtractor {
 
   public HoodieCDCExtractor(
       HoodieTableMetaClient metaClient,
-      String startInstant,
-      String endInstant) {
+      InstantRange range) {
     this.metaClient = metaClient;
     this.basePath = metaClient.getBasePathV2();
     this.fs = metaClient.getFs().getFileSystem();
-    this.supplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
-        metaClient.getTableConfig().cdcSupplementalLoggingMode());
-    this.startInstant = startInstant;
-    this.endInstant = endInstant;
+    this.supplementalLoggingMode = 
metaClient.getTableConfig().cdcSupplementalLoggingMode();
+    this.instantRange = range;
     init();
   }
 
   private void init() {
     initInstantAndCommitMetadatas();
-    initFSView();
   }
 
   /**
    * At the granularity of a file group, trace the mapping between
    * each commit/instant and changes to this file group.
    */
-  public Map<HoodieFileGroupId, List<Pair<HoodieInstant, HoodieCDCFileSplit>>> 
extractCDCFileSplits() {
-    if (commits == null || fsView == null) {
-      throw new HoodieException("Fail to init CDCExtractor");
-    }
+  public Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> 
extractCDCFileSplits() {
+    ValidationUtils.checkState(commits != null, "Empty commits");
 
-    Map<HoodieFileGroupId, List<Pair<HoodieInstant, HoodieCDCFileSplit>>> 
fgToCommitChanges = new HashMap<>();
+    Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fgToCommitChanges = new 
HashMap<>();
     for (HoodieInstant instant : commits.keySet()) {
       HoodieCommitMetadata commitMetadata = commits.get(instant);
 
@@ -134,7 +128,7 @@ public class HoodieCDCExtractor {
           HoodieCDCFileSplit changeFile =
               parseWriteStat(fileGroupId, instant, writeStat, 
commitMetadata.getOperationType());
           fgToCommitChanges.computeIfAbsent(fileGroupId, k -> new 
ArrayList<>());
-          fgToCommitChanges.get(fileGroupId).add(Pair.of(instant, changeFile));
+          fgToCommitChanges.get(fileGroupId).add(changeFile);
         });
       }
 
@@ -144,15 +138,15 @@ public class HoodieCDCExtractor {
         for (String partition : ptToReplacedFileId.keySet()) {
           List<String> fileIds = ptToReplacedFileId.get(partition);
           fileIds.forEach(fileId -> {
-            Option<FileSlice> latestFileSliceOpt = 
fsView.fetchLatestFileSlice(partition, fileId);
+            Option<FileSlice> latestFileSliceOpt = 
getOrCreateFsView().fetchLatestFileSlice(partition, fileId);
             if (latestFileSliceOpt.isPresent()) {
               HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition, 
fileId);
-              HoodieCDCFileSplit changeFile = new HoodieCDCFileSplit(
+              HoodieCDCFileSplit changeFile = new 
HoodieCDCFileSplit(instant.getTimestamp(),
                   REPLACE_COMMIT, null, latestFileSliceOpt, Option.empty());
               if (!fgToCommitChanges.containsKey(fileGroupId)) {
                 fgToCommitChanges.put(fileGroupId, new ArrayList<>());
               }
-              fgToCommitChanges.get(fileGroupId).add(Pair.of(instant, 
changeFile));
+              fgToCommitChanges.get(fileGroupId).add(changeFile);
             }
           });
         }
@@ -161,11 +155,23 @@ public class HoodieCDCExtractor {
     return fgToCommitChanges;
   }
 
+  /**
+   * Returns the fs view directly or creates a new one.
+   *
+   * <p>There is no need to initialize the fs view when supplemental logging 
mode is: WITH_BEFORE_AFTER.
+   */
+  private HoodieTableFileSystemView getOrCreateFsView() {
+    if (this.fsView == null) {
+      this.fsView = initFSView();
+    }
+    return this.fsView;
+  }
+
   /**
    * Parse the commit metadata between (startInstant, endInstant], and extract 
the touched partitions
    * and files to build the filesystem view.
    */
-  private void initFSView() {
+  private HoodieTableFileSystemView initFSView() {
     Set<String> touchedPartitions = new HashSet<>();
     for (Map.Entry<HoodieInstant, HoodieCommitMetadata> entry : 
commits.entrySet()) {
       HoodieCommitMetadata commitMetadata = entry.getValue();
@@ -182,7 +188,7 @@ public class HoodieCDCExtractor {
         Path partitionPath = FSUtils.getPartitionPath(basePath, 
touchedPartition);
         touchedFiles.addAll(Arrays.asList(fs.listStatus(partitionPath)));
       }
-      this.fsView = new HoodieTableFileSystemView(
+      return new HoodieTableFileSystemView(
           metaClient,
           metaClient.getCommitsTimeline().filterCompletedInstants(),
           touchedFiles.toArray(new FileStatus[0])
@@ -192,7 +198,6 @@ public class HoodieCDCExtractor {
     }
   }
 
-
   /**
    * Extract the required instants from all the instants between 
(startInstant, endInstant].
    *
@@ -206,12 +211,12 @@ public class HoodieCDCExtractor {
    */
   private void initInstantAndCommitMetadatas() {
     try {
-      List<String> requiredActions = Arrays.asList(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+      Set<String> requiredActions = new HashSet<>(Arrays.asList(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION));
       HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline();
-      Map<HoodieInstant, HoodieCommitMetadata> result = 
activeTimeLine.getInstants()
+      this.commits = activeTimeLine.getInstants()
           .filter(instant ->
               instant.isCompleted()
-                  && isInRange(instant.getTimestamp(), startInstant, 
endInstant)
+                  && instantRange.isInRange(instant.getTimestamp())
                   && 
requiredActions.contains(instant.getAction().toLowerCase(Locale.ROOT))
           ).map(instant -> {
             HoodieCommitMetadata commitMetadata;
@@ -228,25 +233,13 @@ public class HoodieCDCExtractor {
             }
             return Pair.of(instant, commitMetadata);
           }).filter(pair ->
-              maybeChangeData(pair.getRight().getOperationType())
+              
WriteOperationType.isDataChange(pair.getRight().getOperationType())
           ).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
-      this.commits = result;
     } catch (Exception e) {
       throw new HoodieIOException("Fail to get the commit metadata for CDC");
     }
   }
 
-  private Boolean maybeChangeData(WriteOperationType operation) {
-    return operation == WriteOperationType.INSERT
-        || operation == WriteOperationType.UPSERT
-        || operation == WriteOperationType.DELETE
-        || operation == WriteOperationType.BULK_INSERT
-        || operation == WriteOperationType.DELETE_PARTITION
-        || operation == WriteOperationType.INSERT_OVERWRITE
-        || operation == WriteOperationType.INSERT_OVERWRITE_TABLE
-        || operation == WriteOperationType.BOOTSTRAP;
-  }
-
   /**
    * Parse HoodieWriteStat, judge which type the file is, and what strategy 
should be used to parse CDC data.
    * Then build a [[HoodieCDCFileSplit]] object.
@@ -256,8 +249,9 @@ public class HoodieCDCExtractor {
       HoodieInstant instant,
       HoodieWriteStat writeStat,
       WriteOperationType operation) {
-    Path basePath = metaClient.getBasePathV2();
-    FileSystem fs = metaClient.getFs().getFileSystem();
+    final Path basePath = metaClient.getBasePathV2();
+    final FileSystem fs = metaClient.getFs().getFileSystem();
+    final String instantTs = instant.getTimestamp();
 
     HoodieCDCFileSplit cdcFileSplit;
     if (StringUtils.isNullOrEmpty(writeStat.getCdcPath())) {
@@ -268,35 +262,35 @@ public class HoodieCDCExtractor {
         if (operation == WriteOperationType.DELETE && writeStat.getNumWrites() 
== 0L
             && writeStat.getNumDeletes() != 0) {
           // This is a delete operation wherein all the records in this file 
group are deleted
-          // and no records have been writen out a new file.
+          // and no records have been written out a new file.
           // So, we find the previous file that this operation delete from, 
and treat each of
           // records as a deleted one.
-          HoodieBaseFile beforeBaseFile = fsView.getBaseFileOn(
+          HoodieBaseFile beforeBaseFile = getOrCreateFsView().getBaseFileOn(
               fileGroupId.getPartitionPath(), writeStat.getPrevCommit(), 
fileGroupId.getFileId()
           ).orElseThrow(() ->
               new HoodieIOException("Can not get the previous version of the 
base file")
           );
-          FileSlice beforeFileSlice = new FileSlice(fileGroupId, 
writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
-          cdcFileSplit = new HoodieCDCFileSplit(BASE_FILE_DELETE, null, 
Option.empty(), Option.of(beforeFileSlice));
+          FileSlice beforeFileSlice = new FileSlice(fileGroupId, 
writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
+          cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE, 
null, Option.empty(), Option.of(beforeFileSlice));
         } else if (writeStat.getNumUpdateWrites() == 0L && 
writeStat.getNumDeletes() == 0
             && writeStat.getNumWrites() == writeStat.getNumInserts()) {
           // all the records in this file are new.
-          cdcFileSplit = new HoodieCDCFileSplit(BASE_FILE_INSERT, path);
+          cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_INSERT, 
path);
         } else {
           throw new HoodieException("There should be a cdc log file.");
         }
       } else {
         // this is a log file
         Option<FileSlice> beforeFileSliceOpt = 
getDependentFileSliceForLogFile(fileGroupId, instant, path);
-        cdcFileSplit = new HoodieCDCFileSplit(LOG_FILE, path, 
beforeFileSliceOpt, Option.empty());
+        cdcFileSplit = new HoodieCDCFileSplit(instantTs, LOG_FILE, path, 
beforeFileSliceOpt, Option.empty());
       }
     } else {
       // this is a cdc log
       if 
(supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
-        cdcFileSplit = new HoodieCDCFileSplit(AS_IS, writeStat.getCdcPath());
+        cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, 
writeStat.getCdcPath());
       } else {
         try {
-          HoodieBaseFile beforeBaseFile = fsView.getBaseFileOn(
+          HoodieBaseFile beforeBaseFile = getOrCreateFsView().getBaseFileOn(
               fileGroupId.getPartitionPath(), writeStat.getPrevCommit(), 
fileGroupId.getFileId()
           ).orElseThrow(() ->
               new HoodieIOException("Can not get the previous version of the 
base file")
@@ -307,7 +301,7 @@ public class HoodieCDCExtractor {
           if 
(supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.OP_KEY)) {
             beforeFileSlice = new FileSlice(fileGroupId, 
writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
           }
-          cdcFileSplit = new HoodieCDCFileSplit(AS_IS, writeStat.getCdcPath(),
+          cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, 
writeStat.getCdcPath(),
               Option.ofNullable(beforeFileSlice), 
Option.ofNullable(currentFileSlice));
         } catch (Exception e) {
           throw new HoodieException("Fail to parse HoodieWriteStat", e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
index 43ad3bf176..ae082b1945 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
@@ -21,6 +21,8 @@ package org.apache.hudi.common.table.cdc;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.util.Option;
 
+import org.jetbrains.annotations.NotNull;
+
 import java.io.Serializable;
 
 /**
@@ -39,43 +41,53 @@ import java.io.Serializable;
  * For `cdcInferCase` = {@link HoodieCDCInferCase#REPLACE_COMMIT}, `cdcFile` 
is null,
  * `beforeFileSlice` is the current version of the file slice.
  */
-public class HoodieCDCFileSplit implements Serializable {
+public class HoodieCDCFileSplit implements Serializable, 
Comparable<HoodieCDCFileSplit> {
+  /**
+   * The instant time at which the changes happened.
+   */
+  private final String instant;
 
   /**
-   * * the change type, which decide to how to retrieve the change data. more 
details see: `HoodieCDCLogicalFileType#`
+   * Flag that decides to how to retrieve the change data. More details see: 
`HoodieCDCLogicalFileType`.
    */
   private final HoodieCDCInferCase cdcInferCase;
 
   /**
-   * the file that the change data can be parsed from.
+   * The file that the change data can be parsed from.
    */
   private final String cdcFile;
 
   /**
-   * the file slice that are required when retrieve the before data.
+   * THe file slice that are required when retrieving the before data.
    */
   private final Option<FileSlice> beforeFileSlice;
 
   /**
-   * the file slice that are required when retrieve the after data.
+   * The file slice that are required when retrieving the after data.
    */
   private final Option<FileSlice> afterFileSlice;
 
-  public HoodieCDCFileSplit(HoodieCDCInferCase cdcInferCase, String cdcFile) {
-    this(cdcInferCase, cdcFile, Option.empty(), Option.empty());
+  public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, 
String cdcFile) {
+    this(instant, cdcInferCase, cdcFile, Option.empty(), Option.empty());
   }
 
   public HoodieCDCFileSplit(
+      String instant,
       HoodieCDCInferCase cdcInferCase,
       String cdcFile,
       Option<FileSlice> beforeFileSlice,
       Option<FileSlice> afterFileSlice) {
+    this.instant = instant;
     this.cdcInferCase = cdcInferCase;
     this.cdcFile = cdcFile;
     this.beforeFileSlice = beforeFileSlice;
     this.afterFileSlice = afterFileSlice;
   }
 
+  public String getInstant() {
+    return this.instant;
+  }
+
   public HoodieCDCInferCase getCdcInferCase() {
     return this.cdcInferCase;
   }
@@ -91,4 +103,9 @@ public class HoodieCDCFileSplit implements Serializable {
   public Option<FileSlice> getAfterFileSlice() {
     return this.afterFileSlice;
   }
+
+  @Override
+  public int compareTo(@NotNull HoodieCDCFileSplit o) {
+    return this.instant.compareTo(o.instant);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
index 35a232206f..13a51a4f07 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
@@ -28,8 +28,8 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
  *
  * <ul>
  *   <li>OP_KEY: record keys, the reader needs to figure out the update before 
image and after image;</li>
- *   <li>OP_KEY: before images, the reader needs to figure out the update 
after images;</li>
- *   <li>OP_KEY: before and after images, the reader can generate the details 
directly from the log.</li>
+ *   <li>WITH_BEFORE: before images, the reader needs to figure out the update 
after images;</li>
+ *   <li>WITH_BEFORE_AFTER: before and after images, the reader can generate 
the details directly from the log.</li>
  * </ul>
  */
 public enum HoodieCDCSupplementalLoggingMode {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
index 18db9850a0..f194ddf8f4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table.log;
 
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.util.ClosableIterator;
@@ -26,6 +27,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -33,18 +35,22 @@ import java.io.IOException;
 
 public class HoodieCDCLogRecordIterator implements 
ClosableIterator<IndexedRecord> {
 
-  private final HoodieLogFile cdcLogFile;
-
   private final HoodieLogFormat.Reader reader;
 
   private ClosableIterator<IndexedRecord> itr;
 
+  public HoodieCDCLogRecordIterator(
+      Configuration hadoopConf,
+      Path cdcLogPath,
+      Schema cdcSchema) throws IOException {
+    this(FSUtils.getFs(cdcLogPath, hadoopConf), cdcLogPath, cdcSchema);
+  }
+
   public HoodieCDCLogRecordIterator(
       FileSystem fs,
       Path cdcLogPath,
       Schema cdcSchema) throws IOException {
-    this.cdcLogFile = new HoodieLogFile(fs.getFileStatus(cdcLogPath));
-    this.reader = new HoodieLogFileReader(fs, cdcLogFile, cdcSchema,
+    this.reader = new HoodieLogFileReader(fs, new 
HoodieLogFile(fs.getFileStatus(cdcLogPath)), cdcSchema,
         HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 8ea34d6f2f..16d3a7a3e4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -21,12 +21,14 @@ package org.apache.hudi.common.table.log;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * A scanner used to scan hoodie unmerged log records.
@@ -96,7 +98,9 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
     }
 
     public Builder withLogFilePaths(List<String> logFilePaths) {
-      this.logFilePaths = logFilePaths;
+      this.logFilePaths = logFilePaths.stream()
+          .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+          .collect(Collectors.toList());
       return this;
     }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index a9e10d3e55..27f19f4a66 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -122,6 +123,10 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("The default partition name in case the dynamic 
partition"
           + " column value is null/empty string");
 
+  // ------------------------------------------------------------------------
+  //  Changelog Capture Options
+  // ------------------------------------------------------------------------
+
   public static final ConfigOption<Boolean> CHANGELOG_ENABLED = ConfigOptions
       .key("changelog.enabled")
       .booleanType()
@@ -133,6 +138,23 @@ public class FlinkOptions extends HoodieConfig {
           + "The semantics is best effort because the compaction job would 
finally merge all changes of a record into one.\n"
           + " default false to have UPSERT semantics");
 
+  public static final ConfigOption<Boolean> CDC_ENABLED = ConfigOptions
+      .key("cdc.enabled")
+      .booleanType()
+      .defaultValue(false)
+      .withFallbackKeys(HoodieTableConfig.CDC_ENABLED.key())
+      .withDescription("When enable, persist the change data if necessary, and 
can be queried as a CDC query mode");
+
+  public static final ConfigOption<String> SUPPLEMENTAL_LOGGING_MODE = 
ConfigOptions
+      .key("cdc.supplemental.logging.mode")
+      .stringType()
+      .defaultValue("cdc_data_before_after") // default record all the change 
log images
+      .withFallbackKeys(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key())
+      .withDescription("The supplemental logging mode:"
+          + "1) 'cdc_op_key': persist the 'op' and the record key only,"
+          + "2) 'cdc_data_before': persist the additional 'before' image,"
+          + "3) 'cdc_data_before_after': persist the 'before' and 'after' 
images at the same time");
+
   // ------------------------------------------------------------------------
   //  Metadata table Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 00ebf09426..93044f6ca3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -20,6 +20,7 @@ package org.apache.hudi.configuration;
 
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.format.FilePathUtils;
@@ -189,4 +190,12 @@ public class OptionsResolver {
     return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
         && 
!conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST);
   }
+
+  /**
+   * Returns the supplemental logging mode.
+   */
+  public static HoodieCDCSupplementalLoggingMode 
getCDCSupplementalLoggingMode(Configuration conf) {
+    String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE);
+    return HoodieCDCSupplementalLoggingMode.parse(mode);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 0be2a5300f..dc10970b05 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -21,8 +21,11 @@ package org.apache.hudi.source;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCExtractor;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -33,6 +36,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.table.format.cdc.CdcInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -52,6 +56,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -243,23 +248,7 @@ public class IncrementalInputSplits implements 
Serializable {
     final HoodieInstant instantToIssue = instants.size() == 0 ? null : 
instants.get(instants.size() - 1);
     final InstantRange instantRange;
     if (instantToIssue != null) {
-      if (issuedInstant != null) {
-        // the streaming reader may record the last issued instant, if the 
issued instant is present,
-        // the instant range should be: (issued instant, the latest instant].
-        instantRange = 
InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue.getTimestamp())
-            .rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
-      } else if 
(this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
-        // first time consume and has a start commit
-        final String startCommit = 
this.conf.getString(FlinkOptions.READ_START_COMMIT);
-        instantRange = 
startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
-            ? null
-            : 
InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue.getTimestamp())
-                .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
-      } else {
-        // first time consume and no start commit, consumes the latest 
incremental data set.
-        instantRange = 
InstantRange.builder().startInstant(instantToIssue.getTimestamp()).endInstant(instantToIssue.getTimestamp())
-            .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
-      }
+      instantRange = getInstantRange(issuedInstant, 
instantToIssue.getTimestamp(), false);
     } else {
       LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
       return Result.EMPTY;
@@ -315,6 +304,97 @@ public class IncrementalInputSplits implements 
Serializable {
     return Result.instance(inputSplits, endInstant);
   }
 
+  /**
+   * Returns the incremental cdc input splits.
+   *
+   * @param metaClient    The meta client
+   * @param issuedInstant The last issued instant, only valid in streaming read
+   * @return The list of incremental input splits or empty if there are no new 
instants
+   */
+  public Result inputSplitsCDC(
+      HoodieTableMetaClient metaClient,
+      String issuedInstant) {
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    if (commitTimeline.empty()) {
+      LOG.warn("No splits found for the table under path " + path);
+      return Result.EMPTY;
+    }
+    List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, 
issuedInstant);
+    // get the latest instant that satisfies condition
+    final HoodieInstant instantToIssue = instants.size() == 0 ? null : 
instants.get(instants.size() - 1);
+    final InstantRange instantRange;
+    if (instantToIssue != null) {
+      instantRange = getInstantRange(issuedInstant, 
instantToIssue.getTimestamp(), true);
+    } else {
+      LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
+      return Result.EMPTY;
+    }
+
+    Set<String> readPartitions;
+    final FileStatus[] fileStatuses;
+
+    if (instantRange == null) {
+      // reading from the earliest, scans the partitions and files directly.
+      FileIndex fileIndex = getFileIndex();
+      readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
+      if (readPartitions.size() == 0) {
+        LOG.warn("No partitions found for reading in path: " + path);
+        return Result.EMPTY;
+      }
+      fileStatuses = fileIndex.getFilesInPartitions();
+
+      if (fileStatuses.length == 0) {
+        LOG.warn("No files found for reading in path: " + path);
+        return Result.EMPTY;
+      }
+
+      final String endInstant = instantToIssue.getTimestamp();
+      List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, 
commitTimeline,
+          fileStatuses, readPartitions, endInstant, null);
+
+      return Result.instance(inputSplits, endInstant);
+    } else {
+      HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, 
instantRange);
+      final String endInstant = instantToIssue.getTimestamp();
+      Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = 
extractor.extractCDCFileSplits();
+
+      if (fileSplits.isEmpty()) {
+        LOG.warn("No change logs found for reading in path: " + path);
+        return Result.EMPTY;
+      }
+
+      final AtomicInteger cnt = new AtomicInteger(0);
+      List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
+          .map(splits ->
+              new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), 
maxCompactionMemoryInBytes,
+                  splits.getKey().getFileId(), 
splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
+          .collect(Collectors.toList());
+      return Result.instance(inputSplits, endInstant);
+    }
+  }
+
+  @Nullable
+  private InstantRange getInstantRange(String issuedInstant, String 
instantToIssue, boolean nullableBoundary) {
+    if (issuedInstant != null) {
+      // the streaming reader may record the last issued instant, if the 
issued instant is present,
+      // the instant range should be: (issued instant, the latest instant].
+      return 
InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue)
+          
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
+    } else if 
(this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
+      // first time consume and has a start commit
+      final String startCommit = 
this.conf.getString(FlinkOptions.READ_START_COMMIT);
+      return startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
+          ? null
+          : 
InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue)
+          
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
+    } else {
+      // first time consume and no start commit, consumes the latest 
incremental data set.
+      return 
InstantRange.builder().startInstant(instantToIssue).endInstant(instantToIssue)
+          
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
+    }
+  }
+
   private List<MergeOnReadInputSplit> getInputSplits(
       HoodieTableMetaClient metaClient,
       HoodieTimeline commitTimeline,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 3318cecf10..1812262ab4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -81,6 +81,11 @@ public class StreamReadMonitoringFunction
    */
   private final long interval;
 
+  /**
+   * Flag saying whether the change log capture is enabled.
+   */
+  private final boolean cdcEnabled;
+
   private transient Object checkpointLock;
 
   private volatile boolean isRunning = true;
@@ -106,6 +111,7 @@ public class StreamReadMonitoringFunction
     this.conf = conf;
     this.path = path;
     this.interval = 
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
+    this.cdcEnabled = conf.getBoolean(FlinkOptions.CDC_ENABLED);
     this.incrementalInputSplits = IncrementalInputSplits.builder()
         .conf(conf)
         .path(path)
@@ -195,8 +201,9 @@ public class StreamReadMonitoringFunction
       // table does not exist
       return;
     }
-    IncrementalInputSplits.Result result =
-        incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, 
this.issuedInstant);
+    IncrementalInputSplits.Result result = cdcEnabled
+        ? incrementalInputSplits.inputSplitsCDC(metaClient, this.issuedInstant)
+        : incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, 
this.issuedInstant);
     if (result.isEmpty()) {
       // no new instants, returns early
       return;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 6fac5e4b88..4533ef7179 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -42,6 +42,7 @@ import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
 import org.apache.hudi.source.StreamReadOperator;
 import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.cdc.CdcInputFormat;
 import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -393,8 +394,12 @@ public class HoodieTableSource implements
     if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
       final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
       boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
-      return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
-          rowDataType, Collections.emptyList(), emitDelete);
+      if (this.conf.getBoolean(FlinkOptions.CDC_ENABLED)) {
+        return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, 
rowDataType, Collections.emptyList());
+      } else {
+        return mergeOnReadInputFormat(rowType, requiredRowType, 
tableAvroSchema,
+            rowDataType, Collections.emptyList(), emitDelete);
+      }
     }
     String errMsg = String.format("Invalid query type : '%s', options ['%s'] 
are supported now", queryType,
         FlinkOptions.QUERY_TYPE_SNAPSHOT);
@@ -410,6 +415,31 @@ public class HoodieTableSource implements
     return instantAndCommitMetadata.isPresent();
   }
 
+  private MergeOnReadInputFormat cdcInputFormat(
+      RowType rowType,
+      RowType requiredRowType,
+      Schema tableAvroSchema,
+      DataType rowDataType,
+      List<MergeOnReadInputSplit> inputSplits) {
+    final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+        rowType,
+        requiredRowType,
+        tableAvroSchema.toString(),
+        AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+        inputSplits,
+        conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+    return CdcInputFormat.builder()
+        .config(this.conf)
+        .tableState(hoodieTableState)
+        // use the explicit fields' data type because the AvroSchemaConverter
+        // is not very stable.
+        .fieldTypes(rowDataType.getChildren())
+        .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+        .limit(this.limit)
+        .emitDelete(false) // the change logs iterator can handle the DELETE 
records
+        .build();
+  }
+
   private MergeOnReadInputFormat mergeOnReadInputFormat(
       RowType rowType,
       RowType requiredRowType,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 8adbde355c..f44ec67e5e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -26,11 +26,13 @@ import 
org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.StreamerUtil;
@@ -44,6 +46,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -118,6 +121,23 @@ public class FormatUtils {
     return pos == -1 ? null : record.get(pos);
   }
 
+  public static ExternalSpillableMap<String, byte[]> spillableMap(
+      HoodieWriteConfig writeConfig,
+      long maxCompactionMemoryInBytes) {
+    try {
+      return new ExternalSpillableMap<>(
+          maxCompactionMemoryInBytes,
+          writeConfig.getSpillableMapBasePath(),
+          new DefaultSizeEstimator<>(),
+          new DefaultSizeEstimator<>(),
+          writeConfig.getCommonConfig().getSpillableDiskMapType(),
+          writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
+    } catch (IOException e) {
+      throw new HoodieIOException(
+          "IOException when creating ExternalSpillableMap at " + 
writeConfig.getSpillableMapBasePath(), e);
+    }
+  }
+
   public static HoodieMergedLogRecordScanner logScanner(
       MergeOnReadInputSplit split,
       Schema logSchema,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
new file mode 100644
index 0000000000..41b05c9f1e
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.format.FormatUtils;
+import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
+import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.table.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.RowDataProjection;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.fs.Path;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * The base InputFormat class to read Hoodie data set as change logs.
+ */
+public class CdcInputFormat extends MergeOnReadInputFormat {
+  private static final long serialVersionUID = 1L;
+
+  private CdcInputFormat(
+      Configuration conf,
+      MergeOnReadTableState tableState,
+      List<DataType> fieldTypes,
+      String defaultPartName,
+      long limit,
+      boolean emitDelete) {
+    super(conf, tableState, fieldTypes, defaultPartName, limit, emitDelete);
+  }
+
+  @Override
+  protected RecordIterator initIterator(MergeOnReadInputSplit split) throws 
IOException {
+    if (split instanceof CdcInputSplit) {
+      HoodieCDCSupplementalLoggingMode mode = 
OptionsResolver.getCDCSupplementalLoggingMode(conf);
+      ImageManager manager = new ImageManager(conf, tableState.getRowType(), 
this::getFileSliceIterator);
+      Function<HoodieCDCFileSplit, RecordIterator> recordIteratorFunc =
+          cdcFileSplit -> getRecordIteratorV2(split.getTablePath(), 
split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
+      return new CdcFileSplitsIterator((CdcInputSplit) split, manager, 
recordIteratorFunc);
+    } else {
+      return super.initIterator(split);
+    }
+  }
+
+  /**
+   * Returns the builder for {@link MergeOnReadInputFormat}.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  private RecordIterator getFileSliceIterator(MergeOnReadInputSplit split) {
+    if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
+      // base file only
+      return new 
BaseFileOnlyIterator(getFullSchemaReader(split.getBasePath().get()));
+    } else if (!split.getBasePath().isPresent()) {
+      // log files only
+      return new LogFileOnlyIterator(getFullLogFileIterator(split));
+    } else {
+      Schema tableSchema = new 
Schema.Parser().parse(this.tableState.getAvroSchema());
+      return new MergeIterator(
+          conf,
+          hadoopConf,
+          split,
+          this.tableState.getRowType(),
+          this.tableState.getRowType(),
+          tableSchema,
+          Option.empty(),
+          Option.empty(),
+          false,
+          this.tableState.getOperationPos(),
+          getFullSchemaReader(split.getBasePath().get()));
+    }
+  }
+
+  private RecordIterator getRecordIteratorV2(
+      String tablePath,
+      long maxCompactionMemoryInBytes,
+      HoodieCDCFileSplit fileSplit,
+      HoodieCDCSupplementalLoggingMode mode,
+      ImageManager imageManager) {
+    try {
+      return getRecordIterator(tablePath, maxCompactionMemoryInBytes, 
fileSplit, mode, imageManager);
+    } catch (IOException e) {
+      throw new HoodieException("Get record iterator error", e);
+    }
+  }
+
+  private RecordIterator getRecordIterator(
+      String tablePath,
+      long maxCompactionMemoryInBytes,
+      HoodieCDCFileSplit fileSplit,
+      HoodieCDCSupplementalLoggingMode mode,
+      ImageManager imageManager) throws IOException {
+    switch (fileSplit.getCdcInferCase()) {
+      case BASE_FILE_INSERT:
+        ValidationUtils.checkState(fileSplit.getCdcFile() != null,
+            "CDC file path should exist");
+        String path = new Path(tablePath, fileSplit.getCdcFile()).toString();
+        return new AddBaseFileIterator(getRequiredSchemaReader(path));
+      case BASE_FILE_DELETE:
+        ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+            "Before file slice should exist");
+        FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
+        MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath, 
fileSlice, maxCompactionMemoryInBytes);
+        return new RemoveBaseFileIterator(tableState, 
getFileSliceIterator(inputSplit));
+      case AS_IS:
+        Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new 
Schema.Parser().parse(tableState.getAvroSchema()));
+        Schema cdcSchema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
+        switch (mode) {
+          case WITH_BEFORE_AFTER:
+            return new BeforeAfterImageIterator(tablePath, tableState, 
hadoopConf, cdcSchema, fileSplit);
+          case WITH_BEFORE:
+            return new BeforeImageIterator(conf, hadoopConf, tablePath, 
tableState, cdcSchema, fileSplit, imageManager);
+          case OP_KEY:
+            return new RecordKeyImageIterator(conf, hadoopConf, tablePath, 
tableState, cdcSchema, fileSplit, imageManager);
+          default:
+            throw new AssertionError("Unexpected mode" + mode);
+        }
+      case REPLACE_COMMIT:
+        return new ReplaceCommitIterator(conf, tablePath, tableState, 
fileSplit, this::getFileSliceIterator);
+      default:
+        throw new AssertionError("Unexpected cdc file split infer case: " + 
fileSplit.getCdcInferCase());
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+  static class CdcFileSplitsIterator implements RecordIterator {
+    private ImageManager imageManager; //  keep a reference to release resource
+    private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
+    private final Function<HoodieCDCFileSplit, RecordIterator> 
recordIteratorFunc;
+    private RecordIterator recordIterator;
+
+    CdcFileSplitsIterator(
+        CdcInputSplit inputSplit,
+        ImageManager imageManager,
+        Function<HoodieCDCFileSplit, RecordIterator> recordIteratorFunc) {
+      this.fileSplitIterator = 
Arrays.asList(inputSplit.getChanges()).iterator();
+      this.imageManager = imageManager;
+      this.recordIteratorFunc = recordIteratorFunc;
+    }
+
+    @Override
+    public boolean reachedEnd() throws IOException {
+      if (recordIterator != null) {
+        if (!recordIterator.reachedEnd()) {
+          return false;
+        } else {
+          recordIterator.close(); // release resource
+          recordIterator = null;
+        }
+      }
+      if (fileSplitIterator.hasNext()) {
+        HoodieCDCFileSplit fileSplit = fileSplitIterator.next();
+        recordIterator = recordIteratorFunc.apply(fileSplit);
+        return recordIterator.reachedEnd();
+      }
+      return true;
+    }
+
+    @Override
+    public RowData nextRecord() {
+      return recordIterator.nextRecord();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (recordIterator != null) {
+        recordIterator.close();
+      }
+      if (imageManager != null) {
+        imageManager.close();
+        imageManager = null;
+      }
+    }
+  }
+
+  static class AddBaseFileIterator implements RecordIterator {
+    // base file reader
+    private ParquetColumnarRowSplitReader reader;
+
+    private RowData currentRecord;
+
+    AddBaseFileIterator(ParquetColumnarRowSplitReader reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public boolean reachedEnd() throws IOException {
+      if (!this.reader.reachedEnd()) {
+        currentRecord = this.reader.nextRecord();
+        currentRecord.setRowKind(RowKind.INSERT);
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public RowData nextRecord() {
+      return currentRecord;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.reader != null) {
+        this.reader.close();
+        this.reader = null;
+      }
+    }
+  }
+
+  static class RemoveBaseFileIterator implements RecordIterator {
+    private RecordIterator nested;
+    private final RowDataProjection projection;
+
+    RemoveBaseFileIterator(MergeOnReadTableState tableState, RecordIterator 
iterator) {
+      this.nested = iterator;
+      this.projection = 
RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
+    }
+
+    @Override
+    public boolean reachedEnd() throws IOException {
+      return nested.reachedEnd();
+    }
+
+    @Override
+    public RowData nextRecord() {
+      RowData row = nested.nextRecord();
+      row.setRowKind(RowKind.DELETE);
+      return this.projection.project(row);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.nested != null) {
+        this.nested.close();
+        this.nested = null;
+      }
+    }
+  }
+
+  abstract static class BaseImageIterator implements RecordIterator {
+    private final Schema requiredSchema;
+    private final int[] requiredPos;
+    private final GenericRecordBuilder recordBuilder;
+    private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+
+    // the changelog records iterator
+    private HoodieCDCLogRecordIterator cdcItr;
+
+    private GenericRecord cdcRecord;
+
+    private RowData sideImage;
+
+    private RowData currentImage;
+
+    BaseImageIterator(
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        MergeOnReadTableState tableState,
+        Schema cdcSchema,
+        HoodieCDCFileSplit fileSplit) throws IOException {
+      this.requiredSchema = new 
Schema.Parser().parse(tableState.getRequiredAvroSchema());
+      this.requiredPos = getRequiredPos(tableState.getAvroSchema(), 
this.requiredSchema);
+      this.recordBuilder = new GenericRecordBuilder(requiredSchema);
+      this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+      Path cdcFilePath = new Path(tablePath, fileSplit.getCdcFile());
+      this.cdcItr = new HoodieCDCLogRecordIterator(hadoopConf, cdcFilePath, 
cdcSchema);
+    }
+
+    private int[] getRequiredPos(String tableSchema, Schema required) {
+      Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new 
Schema.Parser().parse(tableSchema));
+      List<String> fields = 
dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+      return required.getFields().stream()
+          .map(f -> fields.indexOf(f.name()))
+          .mapToInt(i -> i)
+          .toArray();
+    }
+
+    @Override
+    public boolean reachedEnd() {
+      if (this.sideImage != null) {
+        currentImage = this.sideImage;
+        this.sideImage = null;
+        return false;
+      } else if (this.cdcItr.hasNext()) {
+        cdcRecord = (GenericRecord) this.cdcItr.next();
+        String op = String.valueOf(cdcRecord.get(0));
+        resolveImage(op);
+        return false;
+      }
+      return true;
+    }
+
+    protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord 
cdcRecord);
+
+    protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord 
cdcRecord);
+
+    @Override
+    public RowData nextRecord() {
+      return currentImage;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.cdcItr != null) {
+        this.cdcItr.close();
+        this.cdcItr = null;
+      }
+    }
+
+    private void resolveImage(String op) {
+      switch (op) {
+        case "i":
+          currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
+          break;
+        case "u":
+          currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
+          sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
+          break;
+        case "d":
+          currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
+          break;
+        default:
+          throw new AssertionError("Unexpected");
+      }
+    }
+
+    protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
+      GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+          avroRecord,
+          requiredSchema,
+          requiredPos,
+          recordBuilder);
+      RowData resolved = (RowData) 
avroToRowDataConverter.convert(requiredAvroRecord);
+      resolved.setRowKind(rowKind);
+      return resolved;
+    }
+  }
+
+  // op, ts, before_image, after_image
+  static class BeforeAfterImageIterator extends BaseImageIterator {
+    BeforeAfterImageIterator(
+        String tablePath,
+        MergeOnReadTableState tableState,
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        Schema cdcSchema,
+        HoodieCDCFileSplit fileSplit) throws IOException {
+      super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
+    }
+
+    @Override
+    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+    }
+  }
+
+  // op, key, before_image
+  static class BeforeImageIterator extends BaseImageIterator {
+    protected ExternalSpillableMap<String, byte[]> afterImages;
+
+    protected final long maxCompactionMemoryInBytes;
+
+    protected final RowDataProjection projection;
+    protected final ImageManager imageManager;
+
+    BeforeImageIterator(
+        Configuration flinkConf,
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        MergeOnReadTableState tableState,
+        Schema cdcSchema,
+        HoodieCDCFileSplit fileSplit,
+        ImageManager imageManager) throws IOException {
+      super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
+      this.maxCompactionMemoryInBytes = 
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf);
+      this.projection = 
RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
+      this.imageManager = imageManager;
+      initImages(fileSplit);
+    }
+
+    protected void initImages(
+        HoodieCDCFileSplit fileSplit) throws IOException {
+      ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
+          "Current file slice does not exist for instant: " + 
fileSplit.getInstant());
+      this.afterImages = this.imageManager.getOrLoadImages(
+          maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
+    }
+
+    @Override
+    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+      String recordKey = cdcRecord.get(1).toString();
+      RowData row = imageManager.getImageRecord(recordKey, this.afterImages, 
rowKind);
+      row.setRowKind(rowKind);
+      return this.projection.project(row);
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+    }
+  }
+
+  // op, key
+  static class RecordKeyImageIterator extends BeforeImageIterator {
+    protected ExternalSpillableMap<String, byte[]> beforeImages;
+
+    RecordKeyImageIterator(
+        Configuration flinkConf,
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        MergeOnReadTableState tableState,
+        Schema cdcSchema,
+        HoodieCDCFileSplit fileSplit,
+        ImageManager imageManager) throws IOException {
+      super(flinkConf, hadoopConf, tablePath, tableState, cdcSchema, 
fileSplit, imageManager);
+    }
+
+    protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException 
{
+      // init after images
+      super.initImages(fileSplit);
+      // init before images
+      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
+      this.beforeImages = this.imageManager.getOrLoadImages(
+          maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      String recordKey = cdcRecord.get(1).toString();
+      RowData row = this.imageManager.getImageRecord(recordKey, 
this.beforeImages, rowKind);
+      row.setRowKind(rowKind);
+      return this.projection.project(row);
+    }
+  }
+
+  static class ReplaceCommitIterator implements RecordIterator {
+    private final RecordIterator itr;
+    private final RowDataProjection projection;
+
+    ReplaceCommitIterator(
+        Configuration flinkConf,
+        String tablePath,
+        MergeOnReadTableState tableState,
+        HoodieCDCFileSplit fileSplit,
+        Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+      this.itr = initIterator(tablePath, 
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), fileSplit, 
splitIteratorFunc);
+      this.projection = 
RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
+    }
+
+    private RecordIterator initIterator(
+        String tablePath,
+        long maxCompactionMemoryInBytes,
+        HoodieCDCFileSplit fileSplit,
+        Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+      // init before images
+
+      // the before file slice must exist,
+      // see HoodieCDCExtractor#extractCDCFileSplits for details
+      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
+      MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
+          tablePath, fileSplit.getBeforeFileSlice().get(), 
maxCompactionMemoryInBytes);
+      return splitIteratorFunc.apply(inputSplit);
+    }
+
+    @Override
+    public boolean reachedEnd() throws IOException {
+      return this.itr.reachedEnd();
+    }
+
+    @Override
+    public RowData nextRecord() {
+      RowData row = this.itr.nextRecord();
+      row.setRowKind(RowKind.DELETE);
+      return this.projection.project(row);
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.itr.close();
+    }
+  }
+
+  public static final class BytesArrayInputView extends DataInputStream 
implements DataInputView {
+    public BytesArrayInputView(byte[] data) {
+      super(new ByteArrayInputStream(data));
+    }
+
+    public void skipBytesToRead(int numBytes) throws IOException {
+      while (numBytes > 0) {
+        int skipped = this.skipBytes(numBytes);
+        numBytes -= skipped;
+      }
+    }
+  }
+
+  public static final class BytesArrayOutputView extends DataOutputStream 
implements DataOutputView {
+    public BytesArrayOutputView(ByteArrayOutputStream baos) {
+      super(baos);
+    }
+
+    public void skipBytesToWrite(int numBytes) throws IOException {
+      for (int i = 0; i < numBytes; ++i) {
+        this.write(0);
+      }
+    }
+
+    public void write(DataInputView source, int numBytes) throws IOException {
+      byte[] buffer = new byte[numBytes];
+      source.readFully(buffer);
+      this.write(buffer);
+    }
+  }
+
+  /**
+   * A before/after image manager
+   * that caches the image records by versions(file slices).
+   */
+  private static class ImageManager implements AutoCloseable {
+    private final HoodieWriteConfig writeConfig;
+
+    private final RowDataSerializer serializer;
+    private final Function<MergeOnReadInputSplit, RecordIterator> 
splitIteratorFunc;
+
+    private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
+
+    public ImageManager(
+        Configuration flinkConf,
+        RowType rowType,
+        Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+      this.serializer = new RowDataSerializer(rowType);
+      this.splitIteratorFunc = splitIteratorFunc;
+      this.cache = new TreeMap<>();
+      this.writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf);
+    }
+
+    public ExternalSpillableMap<String, byte[]> getOrLoadImages(
+        long maxCompactionMemoryInBytes,
+        FileSlice fileSlice) throws IOException {
+      final String instant = fileSlice.getBaseInstantTime();
+      if (this.cache.containsKey(instant)) {
+        return cache.get(instant);
+      }
+      // clean the earliest file slice first
+      if (this.cache.size() > 1) {
+        // keep at most 2 versions: before & after
+        String instantToClean = this.cache.keySet().iterator().next();
+        this.cache.remove(instantToClean).close();
+      }
+      ExternalSpillableMap<String, byte[]> images = 
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
+      this.cache.put(instant, images);
+      return images;
+    }
+
+    private ExternalSpillableMap<String, byte[]> loadImageRecords(
+        long maxCompactionMemoryInBytes,
+        FileSlice fileSlice) throws IOException {
+      MergeOnReadInputSplit inputSplit = 
CdcInputFormat.fileSlice2Split(writeConfig.getBasePath(), fileSlice, 
maxCompactionMemoryInBytes);
+      RecordIterator itr = splitIteratorFunc.apply(inputSplit);
+      // initialize the image records map
+      ExternalSpillableMap<String, byte[]> imageRecordsMap =
+          FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes);
+      while (!itr.reachedEnd()) {
+        RowData row = itr.nextRecord();
+        String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+        serializer.serialize(row, new BytesArrayOutputView(baos));
+        imageRecordsMap.put(recordKey, baos.toByteArray());
+      }
+      itr.close(); // release resource
+      return imageRecordsMap;
+    }
+
+    public RowData getImageRecord(
+        String recordKey,
+        ExternalSpillableMap<String, byte[]> cache,
+        RowKind rowKind) {
+      byte[] bytes = cache.get(recordKey);
+      ValidationUtils.checkState(bytes != null,
+          "Key " + recordKey + " does not exist in current file group");
+      try {
+        RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
+        row.setRowKind(rowKind);
+        return row;
+      } catch (IOException e) {
+        throw new HoodieException("Deserialize bytes into row data exception", 
e);
+      }
+    }
+
+    @Override
+    public void close() {
+      this.cache.values().forEach(ExternalSpillableMap::close);
+      this.cache.clear();
+    }
+  }
+
+  /**
+   * Builder for {@link CdcInputFormat}.
+   */
+  public static class Builder extends MergeOnReadInputFormat.Builder {
+
+    public Builder config(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder tableState(MergeOnReadTableState tableState) {
+      this.tableState = tableState;
+      return this;
+    }
+
+    public Builder fieldTypes(List<DataType> fieldTypes) {
+      this.fieldTypes = fieldTypes;
+      return this;
+    }
+
+    public Builder defaultPartName(String defaultPartName) {
+      this.defaultPartName = defaultPartName;
+      return this;
+    }
+
+    public Builder limit(long limit) {
+      this.limit = limit;
+      return this;
+    }
+
+    public Builder emitDelete(boolean emitDelete) {
+      this.emitDelete = emitDelete;
+      return this;
+    }
+
+    public CdcInputFormat build() {
+      return new CdcInputFormat(conf, tableState, fieldTypes,
+          defaultPartName, limit, emitDelete);
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  public static MergeOnReadInputSplit fileSlice2Split(
+      String tablePath,
+      FileSlice fileSlice,
+      long maxCompactionMemoryInBytes) {
+    Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+        .sorted(HoodieLogFile.getLogFileComparator())
+        .map(logFile -> logFile.getPath().toString())
+        // filter out the cdc logs
+        .filter(path -> !path.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+        .collect(Collectors.toList()));
+    String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+    return new MergeOnReadInputSplit(0, basePath, logPaths,
+        fileSlice.getBaseInstantTime(), tablePath, maxCompactionMemoryInBytes,
+        FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, fileSlice.getFileId());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
new file mode 100644
index 0000000000..914d8518c7
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+
+import java.util.Arrays;
+
+/**
+ * Represents an input split of source, actually a data bucket.
+ */
+public class CdcInputSplit extends MergeOnReadInputSplit {
+  private static final long serialVersionUID = 1L;
+
+  private HoodieCDCFileSplit[] changes;
+
+  public CdcInputSplit(
+      int splitNum,
+      String tablePath,
+      long maxCompactionMemoryInBytes,
+      String fileId,
+      HoodieCDCFileSplit[] changes) {
+    super(splitNum, null, Option.empty(), "", tablePath,
+        maxCompactionMemoryInBytes, "", null, fileId);
+    this.changes = changes;
+  }
+
+  public HoodieCDCFileSplit[] getChanges() {
+    return changes;
+  }
+
+  public void setChanges(HoodieCDCFileSplit[] changes) {
+    this.changes = changes;
+  }
+
+  @Override
+  public String toString() {
+    return "CdcInputSplit{"
+        + "changes=" + Arrays.toString(changes)
+        + ", fileId='" + fileId + '\''
+        + '}';
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index c9b6561bde..e2e8079a47 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -66,6 +66,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
@@ -83,11 +84,11 @@ public class MergeOnReadInputFormat
 
   private static final long serialVersionUID = 1L;
 
-  private final Configuration conf;
+  protected final Configuration conf;
 
-  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+  protected transient org.apache.hadoop.conf.Configuration hadoopConf;
 
-  private final MergeOnReadTableState tableState;
+  protected final MergeOnReadTableState tableState;
 
   /**
    * Uniform iterator view for the underneath records.
@@ -137,7 +138,7 @@ public class MergeOnReadInputFormat
    */
   private boolean closed = true;
 
-  private MergeOnReadInputFormat(
+  protected MergeOnReadInputFormat(
       Configuration conf,
       MergeOnReadTableState tableState,
       List<DataType> fieldTypes,
@@ -168,30 +169,35 @@ public class MergeOnReadInputFormat
     this.currentReadCount = 0L;
     this.closed = false;
     this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
+    this.iterator = initIterator(split);
+    mayShiftInputSplit(split);
+  }
+
+  protected RecordIterator initIterator(MergeOnReadInputSplit split) throws 
IOException {
     if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
       if (split.getInstantRange() != null) {
         // base file only with commit time filtering
-        this.iterator = new BaseFileOnlyFilteringIterator(
+        return new BaseFileOnlyFilteringIterator(
             split.getInstantRange(),
             this.tableState.getRequiredRowType(),
             getReader(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
       } else {
         // base file only
-        this.iterator = new 
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+        return new 
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
       }
     } else if (!split.getBasePath().isPresent()) {
       // log files only
       if (OptionsResolver.emitChangelog(conf)) {
-        this.iterator = new 
LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+        return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
       } else {
-        this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
+        return new LogFileOnlyIterator(getLogFileIterator(split));
       }
     } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
-      this.iterator = new SkipMergeIterator(
+      return new SkipMergeIterator(
           getRequiredSchemaReader(split.getBasePath().get()),
           getLogFileIterator(split));
     } else if 
(split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
-      this.iterator = new MergeIterator(
+      return new MergeIterator(
           conf,
           hadoopConf,
           split,
@@ -211,7 +217,6 @@ public class MergeOnReadInputFormat
           + "spark partition Index: " + split.getSplitNumber()
           + "merge type: " + split.getMergeType());
     }
-    mayShiftInputSplit(split);
   }
 
   @Override
@@ -284,11 +289,15 @@ public class MergeOnReadInputFormat
     }
   }
 
-  private ParquetColumnarRowSplitReader getFullSchemaReader(String path) 
throws IOException {
-    return getReader(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
+  protected ParquetColumnarRowSplitReader getFullSchemaReader(String path) {
+    try {
+      return getReader(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
+    } catch (IOException e) {
+      throw new HoodieException("Get reader error for path: " + path);
+    }
   }
 
-  private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) 
throws IOException {
+  protected ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) 
throws IOException {
     return getReader(path, this.requiredPos);
   }
 
@@ -457,10 +466,63 @@ public class MergeOnReadInputFormat
     };
   }
 
+  protected ClosableIterator<RowData> 
getFullLogFileIterator(MergeOnReadInputSplit split) {
+    final Schema tableSchema = new 
Schema.Parser().parse(tableState.getAvroSchema());
+    final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
+        AvroToRowDataConverters.createRowConverter(tableState.getRowType());
+    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, 
tableSchema, conf, hadoopConf);
+    final Iterator<String> logRecordsKeyIterator = 
scanner.getRecords().keySet().iterator();
+
+    return new ClosableIterator<RowData>() {
+      private RowData currentRecord;
+
+      @Override
+      public boolean hasNext() {
+        while (logRecordsKeyIterator.hasNext()) {
+          String curAvroKey = logRecordsKeyIterator.next();
+          Option<IndexedRecord> curAvroRecord = null;
+          final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord) 
scanner.getRecords().get(curAvroKey);
+          try {
+            curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
+          } catch (IOException e) {
+            throw new HoodieException("Get avro insert value error for key: " 
+ curAvroKey, e);
+          }
+          if (curAvroRecord.isPresent()) {
+            final IndexedRecord avroRecord = curAvroRecord.get();
+            final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, 
tableState.getOperationPos());
+            if (rowKind == RowKind.DELETE) {
+              // skip the delete record
+              continue;
+            }
+            currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
+            currentRecord.setRowKind(rowKind);
+            return true;
+          }
+          // else:
+          // delete record found
+          // skipping if the condition is unsatisfied
+          // continue;
+
+        }
+        return false;
+      }
+
+      @Override
+      public RowData next() {
+        return currentRecord;
+      }
+
+      @Override
+      public void close() {
+        scanner.close();
+      }
+    };
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
-  private interface RecordIterator {
+  protected interface RecordIterator {
     boolean reachedEnd() throws IOException;
 
     RowData nextRecord();
@@ -468,11 +530,11 @@ public class MergeOnReadInputFormat
     void close() throws IOException;
   }
 
-  static class BaseFileOnlyIterator implements RecordIterator {
+  protected static class BaseFileOnlyIterator implements RecordIterator {
     // base file reader
     private final ParquetColumnarRowSplitReader reader;
 
-    BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
+    public BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
       this.reader = reader;
     }
 
@@ -545,11 +607,11 @@ public class MergeOnReadInputFormat
     }
   }
 
-  static class LogFileOnlyIterator implements RecordIterator {
+  protected static class LogFileOnlyIterator implements RecordIterator {
     // iterator for log files
     private final ClosableIterator<RowData> iterator;
 
-    LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
+    public LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
       this.iterator = iterator;
     }
 
@@ -619,7 +681,7 @@ public class MergeOnReadInputFormat
     }
   }
 
-  static class MergeIterator implements RecordIterator {
+  protected static class MergeIterator implements RecordIterator {
     // base file reader
     private final ParquetColumnarRowSplitReader reader;
     // log keys used for merging
@@ -628,15 +690,13 @@ public class MergeOnReadInputFormat
     private final HoodieMergedLogRecordScanner scanner;
 
     private final Schema tableSchema;
-    private final Schema requiredSchema;
-    private final int[] requiredPos;
     private final boolean emitDelete;
     private final int operationPos;
     private final RowDataToAvroConverters.RowDataToAvroConverter 
rowDataToAvroConverter;
     private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
-    private final GenericRecordBuilder recordBuilder;
 
-    private final RowDataProjection projection;
+    private final Option<RowDataProjection> projection;
+    private final Option<Function<IndexedRecord, GenericRecord>> 
avroProjection;
 
     private final InstantRange instantRange;
 
@@ -651,7 +711,7 @@ public class MergeOnReadInputFormat
 
     private RowData currentRecord;
 
-    MergeIterator(
+    public MergeIterator(
         Configuration flinkConf,
         org.apache.hadoop.conf.Configuration hadoopConf,
         MergeOnReadInputSplit split,
@@ -663,19 +723,35 @@ public class MergeOnReadInputFormat
         boolean emitDelete,
         int operationPos,
         ParquetColumnarRowSplitReader reader) { // the reader should be with 
full schema
+      this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, 
tableSchema,
+          Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
+          Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, 
requiredPos, new GenericRecordBuilder(requiredSchema))),
+          emitDelete, operationPos, reader);
+    }
+
+    public MergeIterator(
+        Configuration flinkConf,
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        MergeOnReadInputSplit split,
+        RowType tableRowType,
+        RowType requiredRowType,
+        Schema tableSchema,
+        Option<RowDataProjection> projection,
+        Option<Function<IndexedRecord, GenericRecord>> avroProjection,
+        boolean emitDelete,
+        int operationPos,
+        ParquetColumnarRowSplitReader reader) { // the reader should be with 
full schema
       this.tableSchema = tableSchema;
       this.reader = reader;
       this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, 
hadoopConf);
       this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
       this.logKeysIterator = scanner.getRecords().keySet().iterator();
-      this.requiredSchema = requiredSchema;
-      this.requiredPos = requiredPos;
       this.emitDelete = emitDelete;
       this.operationPos = operationPos;
-      this.recordBuilder = new GenericRecordBuilder(requiredSchema);
+      this.avroProjection = avroProjection;
       this.rowDataToAvroConverter = 
RowDataToAvroConverters.createConverter(tableRowType);
       this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
-      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPos);
+      this.projection = projection;
       this.instantRange = split.getInstantRange().orElse(null);
     }
 
@@ -703,18 +779,18 @@ public class MergeOnReadInputFormat
               // deleted
               continue;
             }
-            GenericRecord avroRecord = buildAvroRecordBySchema(
-                mergedAvroRecord.get(),
-                requiredSchema,
-                requiredPos,
-                recordBuilder);
+            IndexedRecord avroRecord = avroProjection.isPresent()
+                ? avroProjection.get().apply(mergedAvroRecord.get())
+                : mergedAvroRecord.get();
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             this.currentRecord.setRowKind(rowKind);
             return false;
           }
         }
         // project the full record in base with required positions
-        currentRecord = projection.project(currentRecord);
+        if (projection.isPresent()) {
+          currentRecord = projection.get().project(currentRecord);
+        }
         return false;
       }
       // read the logs
@@ -725,11 +801,9 @@ public class MergeOnReadInputFormat
           Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey);
           if (insertAvroRecord.isPresent()) {
             // the record is a DELETE if insertAvroRecord not present, skipping
-            GenericRecord avroRecord = buildAvroRecordBySchema(
-                insertAvroRecord.get(),
-                requiredSchema,
-                requiredPos,
-                recordBuilder);
+            IndexedRecord avroRecord = avroProjection.isPresent()
+                ? avroProjection.get().apply(insertAvroRecord.get())
+                : insertAvroRecord.get();
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), 
this.operationPos);
             return false;
@@ -775,12 +849,12 @@ public class MergeOnReadInputFormat
    * Builder for {@link MergeOnReadInputFormat}.
    */
   public static class Builder {
-    private Configuration conf;
-    private MergeOnReadTableState tableState;
-    private List<DataType> fieldTypes;
-    private String defaultPartName;
-    private long limit = -1;
-    private boolean emitDelete = false;
+    protected Configuration conf;
+    protected MergeOnReadTableState tableState;
+    protected List<DataType> fieldTypes;
+    protected String defaultPartName;
+    protected long limit = -1;
+    protected boolean emitDelete = false;
 
     public Builder config(Configuration conf) {
       this.conf = conf;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index cde646e41f..7a54fa4b40 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -43,7 +43,7 @@ public class MergeOnReadInputSplit implements InputSplit {
   private final long maxCompactionMemoryInBytes;
   private final String mergeType;
   private final Option<InstantRange> instantRange;
-  private String fileId;
+  protected String fileId;
 
   // for streaming reader to record the consumed offset,
   // which is the start of next round reading.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
index 8076d982b9..72a57df5ab 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
@@ -68,6 +68,7 @@ public class RowDataProjection implements Serializable {
    */
   public RowData project(RowData rowData) {
     GenericRowData genericRowData = new 
GenericRowData(this.fieldGetters.length);
+    genericRowData.setRowKind(rowData.getRowKind());
     for (int i = 0; i < this.fieldGetters.length; i++) {
       final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
       genericRowData.setField(i, val);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 5a42f79aff..cdacefbf17 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -316,6 +317,8 @@ public class StreamerUtil {
               
conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName()))
           
.setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING))
           
.setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))
+          .setCDCEnabled(conf.getBoolean(FlinkOptions.CDC_ENABLED))
+          
.setCDCSupplementalLoggingMode(conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE))
           .setTimelineLayoutVersion(1)
           .initTable(hadoopConf, basePath);
       LOG.info("Table initialized under base path {}", basePath);
@@ -421,6 +424,7 @@ public class StreamerUtil {
     HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
     // build the write client to start the embedded timeline server
     final HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
+    
writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
     // create the filesystem view storage properties for client
     final FileSystemViewStorageConfig viewStorageConfig = 
writeConfig.getViewStorageConfig();
     // rebuild the view storage config with simplified options.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 49fb82709a..538b78d988 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.adapter.TestTableEnvs;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
@@ -141,6 +142,41 @@ public class ITTestHoodieDataSource extends 
AbstractTestBase {
     assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieCDCSupplementalLoggingMode.class)
+  void 
testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLoggingMode 
mode) throws Exception {
+    streamTableEnv.getConfig().getConfiguration()
+        .setBoolean("table.dynamic-table-options.enabled", true);
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.CDC_ENABLED, true)
+        .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.getValue())
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    String firstCommit = 
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
+    List<Row> rows = execSelectSql(streamTableEnv,
+        "select * from t1/*+options('read.start-commit'='" + firstCommit + 
"')*/", 10);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+
+    // insert another batch of data
+    execInsertSql(streamTableEnv, TestSQL.UPDATE_INSERT_T1);
+    List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_CHANGELOG);
+
+    // specify the start commit as earliest
+    List<Row> rows3 = execSelectSql(streamTableEnv,
+        "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
+    assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_MERGED);
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 9f82161908..f066a0b702 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -23,12 +23,16 @@ import 
org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.table.HoodieTableSource;
+import org.apache.hudi.table.format.cdc.CdcInputFormat;
 import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -48,8 +52,10 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -309,6 +315,89 @@ public class TestInputFormat {
     assertThat(actual, is(expected));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieCDCSupplementalLoggingMode.class)
+  void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws 
Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+    options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
+    beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+    // write the insert data sets
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    // write another commit to read again
+    TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = 
this.tableSource.getInputFormat(true);
+    assertThat(inputFormat, instanceOf(CdcInputFormat.class));
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    IncrementalInputSplits incrementalInputSplits = 
IncrementalInputSplits.builder()
+        .rowType(TestConfigurations.ROW_TYPE)
+        .conf(conf)
+        .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+        .requiredPartitions(Collections.emptySet())
+        .skipCompaction(false)
+        .build();
+
+    // default read the latest commit
+    IncrementalInputSplits.Result splits = 
incrementalInputSplits.inputSplitsCDC(metaClient, null);
+
+    List<RowData> result = readData(inputFormat, 
splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+    final String actual = TestData.rowDataToString(result);
+    final String expected = "["
+        + "-U[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], "
+        + "+U[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "-U[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], "
+        + "+U[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "-D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
+        + "-D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3]]";
+    assertThat(actual, is(expected));
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieCDCSupplementalLoggingMode.class)
+  void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode 
mode) throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+    options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
+    options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
+    beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+    // write the insert data sets
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    // write another commit to read again
+    TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = 
this.tableSource.getInputFormat(true);
+    assertThat(inputFormat, instanceOf(CdcInputFormat.class));
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    IncrementalInputSplits incrementalInputSplits = 
IncrementalInputSplits.builder()
+        .rowType(TestConfigurations.ROW_TYPE)
+        .conf(conf)
+        .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+        .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2", 
"par3", "par4")))
+        .skipCompaction(false)
+        .build();
+
+    // default read the latest commit
+    IncrementalInputSplits.Result splits = 
incrementalInputSplits.inputSplitsCDC(metaClient, null);
+
+    List<RowData> result = readData(inputFormat, 
splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+    final String actual = TestData.rowDataToString(result);
+    final String expected = "["
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]";
+    assertThat(actual, is(expected));
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
@@ -631,10 +720,14 @@ public class TestInputFormat {
         conf);
   }
 
-  @SuppressWarnings("unchecked, rawtypes")
+  @SuppressWarnings("rawtypes")
   private static List<RowData> readData(InputFormat inputFormat) throws 
IOException {
     InputSplit[] inputSplits = inputFormat.createInputSplits(1);
+    return readData(inputFormat, inputSplits);
+  }
 
+  @SuppressWarnings("unchecked, rawtypes")
+  private static List<RowData> readData(InputFormat inputFormat, InputSplit[] 
inputSplits) throws IOException {
     List<RowData> result = new ArrayList<>();
 
     for (InputSplit inputSplit : inputSplits) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 8e1dd9964c..98053f7821 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -228,6 +228,36 @@ public class TestData {
           TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
   );
 
+  // changelog details of test_source.data and test_source_2.data
+  public static List<RowData> DATA_SET_SOURCE_CHANGELOG = Arrays.asList(
+      updateBeforeRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+      updateAfterRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 24,
+          TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+      updateBeforeRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+      updateAfterRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
+          TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+      updateBeforeRow(StringData.fromString("id3"), 
StringData.fromString("Julian"), 53,
+          TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+      updateAfterRow(StringData.fromString("id3"), 
StringData.fromString("Julian"), 54,
+          TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+      updateBeforeRow(StringData.fromString("id4"), 
StringData.fromString("Fabian"), 31,
+          TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
+      updateAfterRow(StringData.fromString("id4"), 
StringData.fromString("Fabian"), 32,
+          TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
+      updateBeforeRow(StringData.fromString("id5"), 
StringData.fromString("Sophia"), 18,
+          TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+      updateAfterRow(StringData.fromString("id5"), 
StringData.fromString("Sophia"), 18,
+          TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+      insertRow(StringData.fromString("id9"), StringData.fromString("Jane"), 
19,
+          TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+      insertRow(StringData.fromString("id10"), StringData.fromString("Ella"), 
38,
+          TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+      insertRow(StringData.fromString("id11"), 
StringData.fromString("Phoebe"), 52,
+          TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+  );
+
   // data set of test_source.data with partition 'par1' overwrite
   public static List<RowData> DATA_SET_SOURCE_INSERT_OVERWRITE = Arrays.asList(
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index c4c8f10360..4e90303b3b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -27,9 +27,9 @@ import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.TableSchemaResolver
 import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.hudi.common.table.log.InstantRange
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.internal.schema.InternalSchema
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -73,7 +73,14 @@ class CDCRelation(
 
   val tableStructSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
 
-  val cdcExtractor: HoodieCDCExtractor = new HoodieCDCExtractor(metaClient, 
startInstant, endInstant)
+  val cdcExtractor: HoodieCDCExtractor =
+    new HoodieCDCExtractor(
+      metaClient,
+      InstantRange.builder()
+        .startInstant(startInstant)
+        .endInstant(endInstant)
+        .nullableBoundary(true)
+        .rangeType(InstantRange.RangeType.OPEN_CLOSE).build())
 
   override final def needConversion: Boolean = false
 
@@ -98,9 +105,9 @@ class CDCRelation(
       hadoopConf = spark.sessionState.newHadoopConf()
     )
 
-    val changes = cdcExtractor.extractCDCFileSplits().values().asScala.map { 
pairs =>
+    val changes = cdcExtractor.extractCDCFileSplits().values().asScala.map { 
splits =>
       HoodieCDCFileGroupSplit(
-        pairs.asScala.map(pair => (pair.getLeft, 
pair.getRight)).sortBy(_._1).toArray
+        splits.asScala.sorted.toArray
       )
     }
     val cdcRdd = new HoodieCDCRDD(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 49d5e19b16..7d96689ae0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -30,7 +30,6 @@ import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
 import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, 
HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
 import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
-import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -67,10 +66,9 @@ import scala.collection.mutable
 
 /**
  * The split that will be processed by spark task.
+ * The [[changes]] should be sorted first.
  */
-case class HoodieCDCFileGroupSplit(
-    commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)]
-)
+case class HoodieCDCFileGroupSplit(changes: Array[HoodieCDCFileSplit])
 
 /**
  * The Spark [[Partition]]'s implementation.
@@ -94,9 +92,7 @@ class HoodieCDCRDD(
 
   private val confBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(hadoopConf))
 
-  private val cdcSupplementalLoggingMode = 
HoodieCDCSupplementalLoggingMode.parse(
-    metaClient.getTableConfig.cdcSupplementalLoggingMode
-  )
+  private val cdcSupplementalLoggingMode = 
metaClient.getTableConfig.cdcSupplementalLoggingMode
 
   private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
 
@@ -158,7 +154,7 @@ class HoodieCDCRDD(
         .build();
       HoodieTableState(
         pathToString(basePath),
-        split.commitToChanges.map(_._1.getTimestamp).max,
+        split.changes.last.getInstant,
         recordKeyField,
         preCombineFieldOpt,
         usesVirtualKeys = false,
@@ -201,10 +197,10 @@ class HoodieCDCRDD(
     private lazy val projection: UnsafeProjection = 
generateUnsafeProjection(cdcSchema, requiredCdcSchema)
 
     // Iterator on cdc file
-    private val cdcFileIter = split.commitToChanges.sortBy(_._1).iterator
+    private val cdcFileIter = split.changes.iterator
 
     // The instant that is currently being processed
-    private var currentInstant: HoodieInstant = _
+    private var currentInstant: String = _
 
     // The change file that is currently being processed
     private var currentChangeFile: HoodieCDCFileSplit = _
@@ -420,9 +416,9 @@ class HoodieCDCRDD(
       }
 
       if (cdcFileIter.hasNext) {
-        val pair = cdcFileIter.next()
-        currentInstant = pair._1
-        currentChangeFile = pair._2
+        val split = cdcFileIter.next()
+        currentInstant = split.getInstant
+        currentChangeFile = split
         currentChangeFile.getCdcInferCase match {
           case BASE_FILE_INSERT =>
             assert(currentChangeFile.getCdcFile != null)
@@ -480,23 +476,23 @@ class HoodieCDCRDD(
       recordToLoad = currentChangeFile.getCdcInferCase match {
         case BASE_FILE_INSERT =>
           InternalRow.fromSeq(Array(
-            CDCRelation.CDC_OPERATION_INSERT, 
convertToUTF8String(currentInstant.getTimestamp),
+            CDCRelation.CDC_OPERATION_INSERT, 
convertToUTF8String(currentInstant),
             null, null))
         case BASE_FILE_DELETE =>
           InternalRow.fromSeq(Array(
-            CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant.getTimestamp),
+            CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant),
             null, null))
         case LOG_FILE =>
           InternalRow.fromSeq(Array(
-            null, convertToUTF8String(currentInstant.getTimestamp),
+            null, convertToUTF8String(currentInstant),
             null, null))
         case AS_IS =>
           InternalRow.fromSeq(Array(
-            null, convertToUTF8String(currentInstant.getTimestamp),
+            null, convertToUTF8String(currentInstant),
             null, null))
         case REPLACE_COMMIT =>
           InternalRow.fromSeq(Array(
-            CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant.getTimestamp),
+            CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant),
             null, null))
       }
     }


Reply via email to