This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 57c3f632a203 refactor: consolidate common utility classes for Flink 
CDC read (#18436)
57c3f632a203 is described below

commit 57c3f632a203402b3b6d64d72389f2bf5d94f84e
Author: Peter Huang <[email protected]>
AuthorDate: Thu Apr 2 18:53:52 2026 -0700

    refactor: consolidate common utility classes for Flink CDC read (#18436)
    
    * refactor: consolidate common utility classes for Flink CDC read
    * resolve shuo's comments
---
 .../function/HoodieCdcSplitReaderFunction.java     | 785 +-------------------
 .../hudi/table/format/cdc/CdcImageManager.java     | 197 ++++++
 .../hudi/table/format/cdc/CdcInputFormat.java      | 788 +--------------------
 .../apache/hudi/table/format/cdc/CdcIterators.java | 667 +++++++++++++++++
 .../function/TestHoodieCdcSplitReaderFunction.java |  10 +-
 5 files changed, 938 insertions(+), 1509 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
index 4d57ce18f3ca..0b0d4f6191bc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
@@ -18,97 +18,58 @@
 
 package org.apache.hudi.source.reader.function;
 
-import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaCache;
-import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
-import org.apache.hudi.common.table.read.BufferedRecord;
-import org.apache.hudi.common.table.read.BufferedRecordMerger;
-import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
-import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.source.reader.BatchRecords;
 import org.apache.hudi.source.reader.HoodieRecordWithPosition;
 import org.apache.hudi.source.split.HoodieCdcSourceSplit;
 import org.apache.hudi.source.split.HoodieSourceSplit;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.FilePathUtils;
-import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.table.format.cdc.CdcImageManager;
 import org.apache.hudi.table.format.cdc.CdcInputFormat;
+import org.apache.hudi.table.format.cdc.CdcIterators;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadTableState;
-import org.apache.hudi.util.AvroToRowDataConverters;
-import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.util.RowDataProjection;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
 import org.apache.hadoop.fs.Path;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
 import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.List;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
-
 /**
  * CDC reader function for source V2. Reads CDC splits ({@link 
HoodieCdcSourceSplit}) and
- * emits change-log {@link RowData} records tagged with the appropriate {@link 
RowKind}.
+ * emits change-log {@link RowData} records tagged with the appropriate {@link 
org.apache.flink.types.RowKind}.
  *
  * <p>The implementation mirrors the logic in {@link CdcInputFormat}, adapted 
for the
  * {@link SplitReaderFunction} contract.
@@ -158,10 +119,9 @@ public class HoodieCdcSplitReaderFunction extends 
AbstractSplitReaderFunction {
     HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
 
     HoodieCDCSupplementalLoggingMode mode = 
OptionsResolver.getCDCSupplementalLoggingMode(conf);
-    HoodieTableMetaClient client = getMetaClient();
-    HoodieWriteConfig wConfig = getWriteConfig();
 
-    ImageManager imageManager = new ImageManager(tableState.getRowType(), 
wConfig, this::getFileSliceIterator);
+    CdcImageManager imageManager = new CdcImageManager(
+        tableState.getRowType(), getWriteConfig(), this::getFileSliceIterator);
 
     Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc 
=
         cdcFileSplit -> createRecordIteratorSafe(
@@ -169,10 +129,9 @@ public class HoodieCdcSplitReaderFunction extends 
AbstractSplitReaderFunction {
             cdcSplit.getMaxCompactionMemoryInBytes(),
             cdcFileSplit,
             mode,
-            imageManager,
-            client);
+            imageManager);
 
-    currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(), 
imageManager, recordIteratorFunc);
+    currentIterator = new 
CdcIterators.CdcFileSplitsIterator(cdcSplit.getChanges(), imageManager, 
recordIteratorFunc);
     BatchRecords<RowData> records = BatchRecords.forRecords(
         split.splitId(), currentIterator, split.getFileOffset(), 
split.getConsumed());
     records.seek(split.getConsumed());
@@ -212,10 +171,9 @@ public class HoodieCdcSplitReaderFunction extends 
AbstractSplitReaderFunction {
       long maxCompactionMemoryInBytes,
       HoodieCDCFileSplit fileSplit,
       HoodieCDCSupplementalLoggingMode mode,
-      ImageManager imageManager,
-      HoodieTableMetaClient client) {
+      CdcImageManager imageManager) {
     try {
-      return createRecordIterator(tablePath, maxCompactionMemoryInBytes, 
fileSplit, mode, imageManager, client);
+      return createRecordIterator(tablePath, maxCompactionMemoryInBytes, 
fileSplit, mode, imageManager);
     } catch (IOException e) {
       throw new HoodieException("Failed to create CDC record iterator for 
split: " + fileSplit, e);
     }
@@ -226,8 +184,7 @@ public class HoodieCdcSplitReaderFunction extends 
AbstractSplitReaderFunction {
       long maxCompactionMemoryInBytes,
       HoodieCDCFileSplit fileSplit,
       HoodieCDCSupplementalLoggingMode mode,
-      ImageManager imageManager,
-      HoodieTableMetaClient client) throws IOException {
+      CdcImageManager imageManager) throws IOException {
 
     final HoodieSchema tableSchema = 
HoodieSchema.parse(tableState.getTableSchema());
     final HoodieSchema requiredSchema = 
HoodieSchema.parse(tableState.getRequiredSchema());
@@ -236,29 +193,33 @@ public class HoodieCdcSplitReaderFunction extends 
AbstractSplitReaderFunction {
         ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
             "CDC file path should exist and be singleton for 
BASE_FILE_INSERT");
         String path = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
-        return new AddBaseFileIterator(getBaseFileIterator(path));
+        return new CdcIterators.AddBaseFileIterator(getBaseFileIterator(path));
       }
       case BASE_FILE_DELETE: {
         ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
             "Before file slice should exist for BASE_FILE_DELETE");
-        FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
-        MergeOnReadInputSplit inputSplit = 
CdcInputFormat.fileSlice2Split(tablePath, fileSlice, 
maxCompactionMemoryInBytes);
-        return new RemoveBaseFileIterator(tableState.getRequiredRowType(), 
tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
+        MergeOnReadInputSplit inputSplit = CdcIterators.fileSlice2Split(
+            tablePath, fileSplit.getBeforeFileSlice().get(), 
maxCompactionMemoryInBytes);
+        return new CdcIterators.RemoveBaseFileIterator(
+            tableState.getRequiredRowType(), 
tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
       }
       case AS_IS: {
         HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(tableSchema);
         HoodieSchema cdcSchema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
         switch (mode) {
           case DATA_BEFORE_AFTER:
-            return new BeforeAfterImageIterator(
-                getHadoopConf(), tablePath, tableSchema, requiredSchema, 
tableState.getRequiredRowType(), cdcSchema, fileSplit);
+            return new CdcIterators.BeforeAfterImageIterator(
+                getHadoopConf(), tablePath, tableSchema, requiredSchema,
+                tableState.getRequiredRowType(), cdcSchema, fileSplit);
           case DATA_BEFORE:
-            return new BeforeImageIterator(
-                conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, 
tableState.getRequiredRowType(),
+            return new CdcIterators.BeforeImageIterator(
+                getHadoopConf(), tablePath, tableSchema, requiredSchema,
+                tableState.getRequiredRowType(), 
tableState.getRequiredPositions(),
                 maxCompactionMemoryInBytes, cdcSchema, fileSplit, 
imageManager);
           case OP_KEY_ONLY:
-            return new RecordKeyImageIterator(
-                conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, 
tableState.getRequiredRowType(),
+            return new CdcIterators.RecordKeyImageIterator(
+                getHadoopConf(), tablePath, tableSchema, requiredSchema,
+                tableState.getRequiredRowType(), 
tableState.getRequiredPositions(),
                 maxCompactionMemoryInBytes, cdcSchema, fileSplit, 
imageManager);
           default:
             throw new AssertionError("Unexpected CDC supplemental logging 
mode: " + mode);
@@ -268,16 +229,17 @@ public class HoodieCdcSplitReaderFunction extends 
AbstractSplitReaderFunction {
         ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
             "CDC file path should exist and be singleton for LOG_FILE");
         String logFilePath = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
-        MergeOnReadInputSplit split = 
CdcInputFormat.singleLogFile2Split(tablePath, logFilePath, 
maxCompactionMemoryInBytes);
+        MergeOnReadInputSplit split = 
CdcIterators.singleLogFile2Split(tablePath, logFilePath, 
maxCompactionMemoryInBytes);
         ClosableIterator<HoodieRecord<RowData>> recordIterator = 
getFileSliceHoodieRecordIterator(split);
-        return new DataLogFileIterator(
-            maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema, 
tableState.getRequiredRowType(), tableState.getRequiredPositions(),
-            recordIterator, client, getWriteConfig());
+        return new CdcIterators.DataLogFileIterator(
+            maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema,
+            tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+            recordIterator, getMetaClient(), getWriteConfig());
       }
       case REPLACE_COMMIT: {
-        return new ReplaceCommitIterator(
-            conf, tablePath, tableState.getRequiredRowType(), 
tableState.getRequiredPositions(), maxCompactionMemoryInBytes,
-            fileSplit, this::getFileSliceIterator);
+        return new CdcIterators.ReplaceCommitIterator(
+            tablePath, tableState.getRequiredRowType(), 
tableState.getRequiredPositions(),
+            maxCompactionMemoryInBytes, fileSplit, this::getFileSliceIterator);
       }
       default:
         throw new AssertionError("Unexpected CDC file split infer case: " + 
fileSplit.getCdcInferCase());
@@ -355,689 +317,10 @@ public class HoodieCdcSplitReaderFunction extends 
AbstractSplitReaderFunction {
             .orElse(Collections.emptyList()));
   }
 
-  private static int[] computeRequiredPositions(RowType rowType, RowType 
requiredRowType) {
-    List<String> allNames = rowType.getFieldNames();
-    return requiredRowType.getFieldNames().stream()
-        .map(allNames::indexOf)
-        .mapToInt(i -> i)
-        .toArray();
-  }
-
   private HoodieTableMetaClient getMetaClient() {
     if (metaClient == null) {
       metaClient = StreamerUtil.metaClientForReader(conf, getHadoopConf());
     }
     return metaClient;
   }
-
-  // -------------------------------------------------------------------------
-  //  Inner iterators (adapted from CdcInputFormat inner classes)
-  // -------------------------------------------------------------------------
-
-  /** Iterates over an ordered list of {@link HoodieCDCFileSplit}s, delegating 
record reading to a factory. */
-  private static class CdcFileSplitsIterator implements 
ClosableIterator<RowData> {
-    private ImageManager imageManager;
-    private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
-    private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc;
-    private ClosableIterator<RowData> recordIterator;
-
-    CdcFileSplitsIterator(
-        HoodieCDCFileSplit[] changes,
-        ImageManager imageManager,
-        Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc) {
-      this.fileSplitIterator = Arrays.asList(changes).iterator();
-      this.imageManager = imageManager;
-      this.recordIteratorFunc = recordIteratorFunc;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (recordIterator != null) {
-        if (recordIterator.hasNext()) {
-          return true;
-        } else {
-          recordIterator.close();
-          recordIterator = null;
-        }
-      }
-      if (fileSplitIterator.hasNext()) {
-        recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
-        return recordIterator.hasNext();
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      return recordIterator.next();
-    }
-
-    @Override
-    public void close() {
-      if (recordIterator != null) {
-        recordIterator.close();
-      }
-      if (imageManager != null) {
-        imageManager.close();
-        imageManager = null;
-      }
-    }
-  }
-
-  /** Wraps a base-file parquet iterator and marks every record as {@link 
RowKind#INSERT}. */
-  private static class AddBaseFileIterator implements 
ClosableIterator<RowData> {
-    private ClosableIterator<RowData> nested;
-    private RowData currentRecord;
-
-    AddBaseFileIterator(ClosableIterator<RowData> nested) {
-      this.nested = nested;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (nested.hasNext()) {
-        currentRecord = nested.next();
-        currentRecord.setRowKind(RowKind.INSERT);
-        return true;
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      return currentRecord;
-    }
-
-    @Override
-    public void close() {
-      if (nested != null) {
-        nested.close();
-        nested = null;
-      }
-    }
-  }
-
-  /** Wraps a file-slice iterator and marks every record as {@link 
RowKind#DELETE}, with projection. */
-  private static class RemoveBaseFileIterator implements 
ClosableIterator<RowData> {
-    private ClosableIterator<RowData> nested;
-    private final RowDataProjection projection;
-
-    RemoveBaseFileIterator(RowType requiredRowType, int[] requiredPositions, 
ClosableIterator<RowData> iterator) {
-      this.nested = iterator;
-      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPositions);
-    }
-
-    @Override
-    public boolean hasNext() {
-      return nested.hasNext();
-    }
-
-    @Override
-    public RowData next() {
-      RowData row = nested.next();
-      row.setRowKind(RowKind.DELETE);
-      return projection.project(row);
-    }
-
-    @Override
-    public void close() {
-      if (nested != null) {
-        nested.close();
-        nested = null;
-      }
-    }
-  }
-
-  /**
-   * Handles the {@code LOG_FILE} CDC inference case: compares records from 
the log file
-   * against before-image snapshots to emit 
INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE events.
-   */
-  private static class DataLogFileIterator implements 
ClosableIterator<RowData> {
-    private final HoodieSchema tableSchema;
-    private final long maxCompactionMemoryInBytes;
-    private final ImageManager imageManager;
-    private final RowDataProjection projection;
-    private final BufferedRecordMerger recordMerger;
-    private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
-    private final DeleteContext deleteContext;
-    private final HoodieReaderContext<RowData> readerContext;
-    private final String[] orderingFields;
-    private final TypedProperties props;
-
-    private ExternalSpillableMap<String, byte[]> beforeImages;
-    private RowData currentImage;
-    private RowData sideImage;
-
-    DataLogFileIterator(
-        long maxCompactionMemoryInBytes,
-        ImageManager imageManager,
-        HoodieCDCFileSplit cdcFileSplit,
-        HoodieSchema tableSchema,
-        RowType requiredRowType,
-        int[] requiredPositions,
-        ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
-        HoodieTableMetaClient metaClient,
-        HoodieWriteConfig writeConfig) throws IOException {
-      this.tableSchema = tableSchema;
-      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
-      this.imageManager = imageManager;
-      this.projection = 
HoodieSchemaConverter.convertToRowType(tableSchema).equals(requiredRowType)
-              ? null : RowDataProjection.instance(requiredRowType, 
requiredPositions);
-      this.props = writeConfig.getProps();
-      this.readerContext = new 
FlinkReaderContextFactory(metaClient).getContext();
-      readerContext.initRecordMerger(props);
-      this.orderingFields = ConfigUtils.getOrderingFields(props);
-      this.recordMerger = BufferedRecordMergerFactory.create(
-          readerContext,
-          readerContext.getMergeMode(),
-          false,
-          Option.of(writeConfig.getRecordMerger()),
-          tableSchema,
-          
Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(), 
writeConfig.getPayloadClass())),
-          props,
-          metaClient.getTableConfig().getPartialUpdateMode());
-      this.logRecordIterator = logRecordIterator;
-      this.deleteContext = new DeleteContext(props, 
tableSchema).withReaderSchema(tableSchema);
-      initImages(cdcFileSplit, writeConfig);
-    }
-
-    private void initImages(HoodieCDCFileSplit fileSplit, HoodieWriteConfig 
writeConfig) throws IOException {
-      if (fileSplit.getBeforeFileSlice().isPresent() && 
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
-        this.beforeImages = this.imageManager.getOrLoadImages(
-            maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
-      } else {
-        this.beforeImages = FormatUtils.spillableMap(writeConfig, 
maxCompactionMemoryInBytes, getClass().getSimpleName());
-      }
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (sideImage != null) {
-        currentImage = sideImage;
-        sideImage = null;
-        return true;
-      }
-      while (logRecordIterator.hasNext()) {
-        HoodieRecord<RowData> record = logRecordIterator.next();
-        RowData existed = 
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
-        if (isDelete(record)) {
-          if (existed != null) {
-            existed.setRowKind(RowKind.DELETE);
-            currentImage = existed;
-            return true;
-          }
-        } else {
-          if (existed == null) {
-            RowData newRow = record.getData();
-            newRow.setRowKind(RowKind.INSERT);
-            currentImage = newRow;
-            return true;
-          } else {
-            HoodieOperation operation = 
HoodieOperation.fromValue(existed.getRowKind().toByteValue());
-            HoodieRecord<RowData> historyRecord = new 
HoodieFlinkRecord(record.getKey(), operation, existed);
-            HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord, 
record).get();
-            if (merged.getData() != existed) {
-              existed.setRowKind(RowKind.UPDATE_BEFORE);
-              currentImage = existed;
-              RowData mergedRow = merged.getData();
-              mergedRow.setRowKind(RowKind.UPDATE_AFTER);
-              imageManager.updateImageRecord(record.getRecordKey(), 
beforeImages, mergedRow);
-              sideImage = mergedRow;
-              return true;
-            }
-          }
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      return projection != null ? projection.project(currentImage) : 
currentImage;
-    }
-
-    @Override
-    public void close() {
-      logRecordIterator.close();
-      imageManager.close();
-    }
-
-    @SuppressWarnings("unchecked")
-    private Option<HoodieRecord<RowData>> mergeRowWithLog(
-        HoodieRecord<RowData> historyRecord, HoodieRecord<RowData> newRecord) {
-      try {
-        BufferedRecord<RowData> histBuf = BufferedRecords.fromHoodieRecord(
-            historyRecord, tableSchema, readerContext.getRecordContext(), 
props, orderingFields, deleteContext);
-        BufferedRecord<RowData> newBuf = BufferedRecords.fromHoodieRecord(
-            newRecord, tableSchema, readerContext.getRecordContext(), props, 
orderingFields, deleteContext);
-        BufferedRecord<RowData> merged = recordMerger.finalMerge(histBuf, 
newBuf);
-        return Option.ofNullable(readerContext.getRecordContext()
-            .constructHoodieRecord(merged, historyRecord.getPartitionPath()));
-      } catch (IOException e) {
-        throw new HoodieIOException("Merge base and delta payloads exception", 
e);
-      }
-    }
-
-    private boolean isDelete(HoodieRecord<RowData> record) {
-      return record.isDelete(deleteContext, CollectionUtils.emptyProps());
-    }
-  }
-
-  /**
-   * Base iterator for CDC log files stored with supplemental logging (AS_IS 
inference case).
-   * Reads {@link HoodieCDCLogRecordIterator} and resolves before/after images 
using
-   * subclass-specific logic.
-   */
-  private abstract static class BaseImageIterator implements 
ClosableIterator<RowData> {
-    private final HoodieSchema requiredSchema;
-    private final int[] requiredPos;
-    private final GenericRecordBuilder recordBuilder;
-    private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
-    private HoodieCDCLogRecordIterator cdcItr;
-
-    private GenericRecord cdcRecord;
-    private RowData sideImage;
-    private RowData currentImage;
-
-    BaseImageIterator(
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        String tablePath,
-        HoodieSchema tableSchema,
-        HoodieSchema requiredSchema,
-        RowType requiredRowType,
-        HoodieSchema cdcSchema,
-        HoodieCDCFileSplit fileSplit) {
-      this.requiredSchema = requiredSchema;
-      this.requiredPos = computeRequiredPos(tableSchema, requiredSchema);
-      this.recordBuilder = new 
GenericRecordBuilder(requiredSchema.getAvroSchema());
-      this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
-
-      StoragePath hadoopTablePath = new StoragePath(tablePath);
-      HoodieStorage storage = HoodieStorageUtils.getStorage(
-          tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
-      HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream()
-          .map(cdcFile -> {
-            try {
-              return new HoodieLogFile(storage.getPathInfo(new 
StoragePath(hadoopTablePath, cdcFile)));
-            } catch (IOException e) {
-              throw new HoodieIOException("Failed to get file status for CDC 
log: " + cdcFile, e);
-            }
-          })
-          .toArray(HoodieLogFile[]::new);
-      this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, 
cdcSchema);
-    }
-
-    private static int[] computeRequiredPos(HoodieSchema tableSchema, 
HoodieSchema requiredSchema) {
-      HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(tableSchema);
-      List<String> fields = dataSchema.getFields().stream()
-          .map(HoodieSchemaField::name)
-          .collect(Collectors.toList());
-      return requiredSchema.getFields().stream()
-          .map(f -> fields.indexOf(f.name()))
-          .mapToInt(i -> i)
-          .toArray();
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (sideImage != null) {
-        currentImage = sideImage;
-        sideImage = null;
-        return true;
-      } else if (cdcItr.hasNext()) {
-        cdcRecord = (GenericRecord) cdcItr.next();
-        String op = String.valueOf(cdcRecord.get(0));
-        resolveImage(op);
-        return true;
-      }
-      return false;
-    }
-
-    protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord 
cdcRecord);
-
-    protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord 
cdcRecord);
-
-    @Override
-    public RowData next() {
-      return currentImage;
-    }
-
-    @Override
-    public void close() {
-      if (cdcItr != null) {
-        cdcItr.close();
-        cdcItr = null;
-      }
-    }
-
-    private void resolveImage(String op) {
-      switch (op) {
-        case "i":
-          currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
-          break;
-        case "u":
-          currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
-          sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
-          break;
-        case "d":
-          currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
-          break;
-        default:
-          throw new AssertionError("Unexpected CDC operation: " + op);
-      }
-    }
-
-    protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
-      GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
-          avroRecord, requiredSchema, requiredPos, recordBuilder);
-      RowData resolved = (RowData) 
avroToRowDataConverter.convert(requiredAvroRecord);
-      resolved.setRowKind(rowKind);
-      return resolved;
-    }
-  }
-
-  /** Reads CDC log files that contain both before and after images ({@code 
DATA_BEFORE_AFTER} mode). */
-  private static class BeforeAfterImageIterator extends BaseImageIterator {
-    BeforeAfterImageIterator(
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        String tablePath,
-        HoodieSchema tableSchema,
-        HoodieSchema requiredSchema,
-        RowType requiredRowType,
-        HoodieSchema cdcSchema,
-        HoodieCDCFileSplit fileSplit) {
-      super(hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType, cdcSchema, fileSplit);
-    }
-
-    @Override
-    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
-      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
-    }
-
-    @Override
-    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
-      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
-    }
-  }
-
-  /**
-   * Reads CDC log files containing op + key + before_image ({@code 
DATA_BEFORE} mode).
-   * The after-image is loaded from the after file-slice via the {@link 
ImageManager}.
-   */
-  private static class BeforeImageIterator extends BaseImageIterator {
-    protected ExternalSpillableMap<String, byte[]> afterImages;
-    protected final long maxCompactionMemoryInBytes;
-    protected final RowDataProjection projection;
-    protected final ImageManager imageManager;
-
-    BeforeImageIterator(
-        org.apache.flink.configuration.Configuration flinkConf,
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        String tablePath,
-        HoodieSchema tableSchema,
-        HoodieSchema requiredSchema,
-        RowType requiredRowType,
-        long maxCompactionMemoryInBytes,
-        HoodieSchema cdcSchema,
-        HoodieCDCFileSplit fileSplit,
-        ImageManager imageManager) throws IOException {
-      super(hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType, cdcSchema, fileSplit);
-      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
-      this.projection = RowDataProjection.instance(requiredRowType,
-          computePositions(tableSchema, requiredRowType));
-      this.imageManager = imageManager;
-      initImages(fileSplit);
-    }
-
-    protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException 
{
-      ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
-          "Current file slice does not exist for instant: " + 
fileSplit.getInstant());
-      this.afterImages = imageManager.getOrLoadImages(
-          maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
-    }
-
-    @Override
-    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
-      String recordKey = cdcRecord.get(1).toString();
-      RowData row = imageManager.getImageRecord(recordKey, afterImages, 
rowKind);
-      row.setRowKind(rowKind);
-      return projection.project(row);
-    }
-
-    @Override
-    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
-      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
-    }
-
-    private static int[] computePositions(HoodieSchema tableSchema, RowType 
requiredRowType) {
-      List<String> allFields = tableSchema.getFields().stream()
-          .map(HoodieSchemaField::name)
-          .collect(Collectors.toList());
-      return requiredRowType.getFieldNames().stream()
-          .map(allFields::indexOf)
-          .mapToInt(i -> i)
-          .toArray();
-    }
-  }
-
-  /**
-   * Reads CDC log files containing only op + key ({@code OP_KEY_ONLY} mode).
-   * Both before and after images are loaded from file-slice snapshots via 
{@link ImageManager}.
-   */
-  private static class RecordKeyImageIterator extends BeforeImageIterator {
-    protected ExternalSpillableMap<String, byte[]> beforeImages;
-
-    RecordKeyImageIterator(
-        org.apache.flink.configuration.Configuration flinkConf,
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        String tablePath,
-        HoodieSchema tableSchema,
-        HoodieSchema requiredSchema,
-        RowType requiredRowType,
-        long maxCompactionMemoryInBytes,
-        HoodieSchema cdcSchema,
-        HoodieCDCFileSplit fileSplit,
-        ImageManager imageManager) throws IOException {
-      super(flinkConf, hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType,
-          maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
-    }
-
-    @Override
-    protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException 
{
-      super.initImages(fileSplit);
-      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
-          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
-      this.beforeImages = imageManager.getOrLoadImages(
-          maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
-    }
-
-    @Override
-    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
-      String recordKey = cdcRecord.get(1).toString();
-      RowData row = imageManager.getImageRecord(recordKey, beforeImages, 
rowKind);
-      row.setRowKind(rowKind);
-      return projection.project(row);
-    }
-  }
-
-  /** Handles the {@code REPLACE_COMMIT} CDC inference case: emits all records 
from before-slice as DELETE. */
-  private static class ReplaceCommitIterator implements 
ClosableIterator<RowData> {
-    private final ClosableIterator<RowData> itr;
-    private final RowDataProjection projection;
-
-    ReplaceCommitIterator(
-        org.apache.flink.configuration.Configuration flinkConf,
-        String tablePath,
-        RowType requiredRowType,
-        int[] requiredPositions,
-        long maxCompactionMemoryInBytes,
-        HoodieCDCFileSplit fileSplit,
-        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
-      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
-          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
-      MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
-          tablePath, fileSplit.getBeforeFileSlice().get(), 
maxCompactionMemoryInBytes);
-      this.itr = splitIteratorFunc.apply(inputSplit);
-      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPositions);
-    }
-
-    @Override
-    public boolean hasNext() {
-      return itr.hasNext();
-    }
-
-    @Override
-    public RowData next() {
-      RowData row = itr.next();
-      row.setRowKind(RowKind.DELETE);
-      return projection.project(row);
-    }
-
-    @Override
-    public void close() {
-      itr.close();
-    }
-  }
-
-  // -------------------------------------------------------------------------
-  //  ImageManager - caches full-schema row images keyed by record key
-  // -------------------------------------------------------------------------
-
-  /**
-   * Manages serialized before/after image snapshots for a file group, cached 
by instant time.
-   * At most two versions (before and after) are kept in memory; older entries 
are spilled to disk.
-   */
-  private static class ImageManager implements AutoCloseable {
-    private final HoodieWriteConfig writeConfig;
-    private final RowDataSerializer serializer;
-    private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc;
-    private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
-
-    ImageManager(
-        RowType rowType,
-        HoodieWriteConfig writeConfig,
-        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
-      this.serializer = new RowDataSerializer(rowType);
-      this.writeConfig = writeConfig;
-      this.splitIteratorFunc = splitIteratorFunc;
-      this.cache = new TreeMap<>();
-    }
-
-    ExternalSpillableMap<String, byte[]> getOrLoadImages(
-        long maxCompactionMemoryInBytes, FileSlice fileSlice) throws 
IOException {
-      final String instant = fileSlice.getBaseInstantTime();
-      if (cache.containsKey(instant)) {
-        return cache.get(instant);
-      }
-      if (cache.size() > 1) {
-        String oldest = cache.keySet().iterator().next();
-        cache.remove(oldest).close();
-      }
-      ExternalSpillableMap<String, byte[]> images = 
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
-      cache.put(instant, images);
-      return images;
-    }
-
-    private ExternalSpillableMap<String, byte[]> loadImageRecords(
-        long maxCompactionMemoryInBytes, FileSlice fileSlice) throws 
IOException {
-      MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
-          writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
-      ExternalSpillableMap<String, byte[]> imageRecordsMap =
-          FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, 
getClass().getSimpleName());
-      try (ClosableIterator<RowData> itr = 
splitIteratorFunc.apply(inputSplit)) {
-        while (itr.hasNext()) {
-          RowData row = itr.next();
-          String recordKey = 
row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
-          ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-          serializer.serialize(row, new BytesArrayOutputView(baos));
-          imageRecordsMap.put(recordKey, baos.toByteArray());
-        }
-      }
-      return imageRecordsMap;
-    }
-
-    RowData getImageRecord(
-        String recordKey, ExternalSpillableMap<String, byte[]> imageCache, 
RowKind rowKind) {
-      byte[] bytes = imageCache.get(recordKey);
-      ValidationUtils.checkState(bytes != null,
-          "Key " + recordKey + " does not exist in current file group image");
-      try {
-        RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
-        row.setRowKind(rowKind);
-        return row;
-      } catch (IOException e) {
-        throw new HoodieException("Failed to deserialize image record for key: 
" + recordKey, e);
-      }
-    }
-
-    void updateImageRecord(
-        String recordKey, ExternalSpillableMap<String, byte[]> imageCache, 
RowData row) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-      try {
-        serializer.serialize(row, new BytesArrayOutputView(baos));
-      } catch (IOException e) {
-        throw new HoodieException("Failed to serialize image record for key: " 
+ recordKey, e);
-      }
-      imageCache.put(recordKey, baos.toByteArray());
-    }
-
-    RowData removeImageRecord(
-        String recordKey, ExternalSpillableMap<String, byte[]> imageCache) {
-      byte[] bytes = imageCache.remove(recordKey);
-      if (bytes == null) {
-        return null;
-      }
-      try {
-        return serializer.deserialize(new BytesArrayInputView(bytes));
-      } catch (IOException e) {
-        throw new HoodieException("Failed to deserialize image record for key: 
" + recordKey, e);
-      }
-    }
-
-    @Override
-    public void close() {
-      cache.values().forEach(ExternalSpillableMap::close);
-      cache.clear();
-    }
-  }
-
-  // -------------------------------------------------------------------------
-  //  I/O view adapters for RowDataSerializer
-  // -------------------------------------------------------------------------
-
-  private static final class BytesArrayInputView extends DataInputStream
-      implements org.apache.flink.core.memory.DataInputView {
-    BytesArrayInputView(byte[] data) {
-      super(new ByteArrayInputStream(data));
-    }
-
-    @Override
-    public void skipBytesToRead(int numBytes) throws IOException {
-      while (numBytes > 0) {
-        int skipped = skipBytes(numBytes);
-        numBytes -= skipped;
-      }
-    }
-  }
-
-  private static final class BytesArrayOutputView extends DataOutputStream
-      implements org.apache.flink.core.memory.DataOutputView {
-    BytesArrayOutputView(ByteArrayOutputStream baos) {
-      super(baos);
-    }
-
-    @Override
-    public void skipBytesToWrite(int numBytes) throws IOException {
-      for (int i = 0; i < numBytes; i++) {
-        write(0);
-      }
-    }
-
-    @Override
-    public void write(org.apache.flink.core.memory.DataInputView source, int 
numBytes) throws IOException {
-      byte[] buffer = new byte[numBytes];
-      source.readFully(buffer);
-      write(buffer);
-    }
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
new file mode 100644
index 000000000000..cad323b7fb81
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import lombok.Getter;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+
+/**
+ * Manages serialized before/after image snapshots for a CDC file group, 
cached by instant time.
+ *
+ * <p>At most two versions (before and after) are kept in memory at once; 
older entries are
+ * evicted and spilled to disk via {@link ExternalSpillableMap}.
+ *
+ * <p>Also owns the I/O-view adapters ({@link BytesArrayInputView} / {@link 
BytesArrayOutputView})
+ * used for serialising {@link RowData} records into byte arrays.
+ */
+public class CdcImageManager implements AutoCloseable {
+  @Getter
+  private final HoodieWriteConfig writeConfig;
+  private final RowDataSerializer serializer;
+  private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc;
+  private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
+
+  public CdcImageManager(
+      RowType rowType,
+      HoodieWriteConfig writeConfig,
+      Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
+    this.serializer = new RowDataSerializer(rowType);
+    this.writeConfig = writeConfig;
+    this.splitIteratorFunc = splitIteratorFunc;
+    this.cache = new TreeMap<>();
+  }
+
+  public ExternalSpillableMap<String, byte[]> getOrLoadImages(
+      long maxCompactionMemoryInBytes,
+      FileSlice fileSlice) throws IOException {
+    final String instant = fileSlice.getBaseInstantTime();
+    if (cache.containsKey(instant)) {
+      return cache.get(instant);
+    }
+    // evict the earliest version when more than two are cached (keep before & 
after)
+    if (cache.size() > 1) {
+      String oldest = cache.keySet().iterator().next();
+      cache.remove(oldest).close();
+    }
+    ExternalSpillableMap<String, byte[]> images = 
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
+    cache.put(instant, images);
+    return images;
+  }
+
+  private ExternalSpillableMap<String, byte[]> loadImageRecords(
+      long maxCompactionMemoryInBytes,
+      FileSlice fileSlice) throws IOException {
+    MergeOnReadInputSplit inputSplit = CdcIterators.fileSlice2Split(
+        writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
+    ExternalSpillableMap<String, byte[]> imageRecordsMap =
+        FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, 
getClass().getSimpleName());
+    try (ClosableIterator<RowData> itr = splitIteratorFunc.apply(inputSplit)) {
+      while (itr.hasNext()) {
+        RowData row = itr.next();
+        String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+        serializer.serialize(row, new BytesArrayOutputView(baos));
+        imageRecordsMap.put(recordKey, baos.toByteArray());
+      }
+    }
+    return imageRecordsMap;
+  }
+
+  public RowData getImageRecord(
+      String recordKey,
+      ExternalSpillableMap<String, byte[]> imageCache,
+      RowKind rowKind) {
+    byte[] bytes = imageCache.get(recordKey);
+    ValidationUtils.checkState(bytes != null,
+        "Key " + recordKey + " does not exist in current file group image");
+    try {
+      RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
+      row.setRowKind(rowKind);
+      return row;
+    } catch (IOException e) {
+      throw new HoodieException("Failed to deserialize image record for key: " 
+ recordKey, e);
+    }
+  }
+
+  public void updateImageRecord(
+      String recordKey,
+      ExternalSpillableMap<String, byte[]> imageCache,
+      RowData row) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    try {
+      serializer.serialize(row, new BytesArrayOutputView(baos));
+    } catch (IOException e) {
+      throw new HoodieException("Failed to serialize image record for key: " + 
recordKey, e);
+    }
+    imageCache.put(recordKey, baos.toByteArray());
+  }
+
+  public RowData removeImageRecord(
+      String recordKey,
+      ExternalSpillableMap<String, byte[]> imageCache) {
+    byte[] bytes = imageCache.remove(recordKey);
+    if (bytes == null) {
+      return null;
+    }
+    try {
+      return serializer.deserialize(new BytesArrayInputView(bytes));
+    } catch (IOException e) {
+      throw new HoodieException("Failed to deserialize image record for key: " 
+ recordKey, e);
+    }
+  }
+
+  @Override
+  public void close() {
+    cache.values().forEach(ExternalSpillableMap::close);
+    cache.clear();
+  }
+
+  // -------------------------------------------------------------------------
+  //  I/O view adapters for RowDataSerializer
+  // -------------------------------------------------------------------------
+
+  static final class BytesArrayInputView extends DataInputStream implements 
DataInputView {
+    BytesArrayInputView(byte[] data) {
+      super(new ByteArrayInputStream(data));
+    }
+
+    @Override
+    public void skipBytesToRead(int numBytes) throws IOException {
+      while (numBytes > 0) {
+        int skipped = skipBytes(numBytes);
+        numBytes -= skipped;
+      }
+    }
+  }
+
+  static final class BytesArrayOutputView extends DataOutputStream implements 
DataOutputView {
+    BytesArrayOutputView(ByteArrayOutputStream baos) {
+      super(baos);
+    }
+
+    @Override
+    public void skipBytesToWrite(int numBytes) throws IOException {
+      for (int i = 0; i < numBytes; i++) {
+        write(0);
+      }
+    }
+
+    @Override
+    public void write(DataInputView source, int numBytes) throws IOException {
+      byte[] buffer = new byte[numBytes];
+      source.readFully(buffer);
+      write(buffer);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 5aa29e3d99dc..27bb2b3a56e8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -18,92 +18,39 @@
 
 package org.apache.hudi.table.format.cdc;
 
-import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaCache;
-import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
-import org.apache.hudi.common.table.read.BufferedRecord;
-import org.apache.hudi.common.table.read.BufferedRecordMerger;
-import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
-import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.format.FlinkReaderContextFactory;
-import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadTableState;
-import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.FlinkWriteClients;
-import org.apache.hudi.util.RowDataProjection;
-import org.apache.hudi.util.StreamerUtil;
 
-import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
 import org.apache.hadoop.fs.Path;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
 
 /**
  * The base InputFormat class to read Hoodie data set as change logs.
  */
-@Slf4j
 public class CdcInputFormat extends MergeOnReadInputFormat {
   private static final long serialVersionUID = 1L;
 
@@ -121,17 +68,20 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
   protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit 
split) throws IOException {
     if (split instanceof CdcInputSplit) {
       HoodieCDCSupplementalLoggingMode mode = 
OptionsResolver.getCDCSupplementalLoggingMode(conf);
-      ImageManager manager = new ImageManager(conf, tableState.getRowType(), 
this::getFileSliceIterator);
+      CdcImageManager manager = new CdcImageManager(
+          tableState.getRowType(),
+          FlinkWriteClients.getHoodieClientConfig(conf),
+          this::getFileSliceIterator);
       Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc =
-          cdcFileSplit -> getRecordIteratorV2(split.getTablePath(), 
split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
-      return new CdcFileSplitsIterator((CdcInputSplit) split, manager, 
recordIteratorFunc);
+          cdcFileSplit -> getRecordIteratorSafe(split.getTablePath(), 
split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
+      return new CdcIterators.CdcFileSplitsIterator(((CdcInputSplit) 
split).getChanges(), manager, recordIteratorFunc);
     } else {
       return super.initIterator(split);
     }
   }
 
   /**
-   * Returns the builder for {@link MergeOnReadInputFormat}.
+   * Returns the builder for {@link CdcInputFormat}.
    */
   public static Builder builder() {
     return new Builder();
@@ -139,22 +89,21 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
 
   private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit 
split) {
     try {
-      // get full schema iterator.
       final HoodieSchema schema = HoodieSchemaCache.intern(
           HoodieSchema.parse(tableState.getTableSchema()));
-      // before/after images have assumption of snapshot scan, so `emitDelete` 
is set as false
+      // before/after images use snapshot scan semantics, so emitDelete is 
false
       return getSplitRowIterator(split, schema, schema, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
     } catch (IOException e) {
       throw new HoodieException("Failed to create iterator for split: " + 
split, e);
     }
   }
 
-  private ClosableIterator<RowData> getRecordIteratorV2(
+  private ClosableIterator<RowData> getRecordIteratorSafe(
       String tablePath,
       long maxCompactionMemoryInBytes,
       HoodieCDCFileSplit fileSplit,
       HoodieCDCSupplementalLoggingMode mode,
-      ImageManager imageManager) {
+      CdcImageManager imageManager) {
     try {
       return getRecordIterator(tablePath, maxCompactionMemoryInBytes, 
fileSplit, mode, imageManager);
     } catch (IOException e) {
@@ -167,41 +116,56 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
       long maxCompactionMemoryInBytes,
       HoodieCDCFileSplit fileSplit,
       HoodieCDCSupplementalLoggingMode mode,
-      ImageManager imageManager) throws IOException {
+      CdcImageManager imageManager) throws IOException {
     switch (fileSplit.getCdcInferCase()) {
       case BASE_FILE_INSERT:
         ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
             "CDC file path should exist and be singleton");
         String path = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
-        return new AddBaseFileIterator(getBaseFileIterator(path));
+        return new CdcIterators.AddBaseFileIterator(getBaseFileIterator(path));
       case BASE_FILE_DELETE:
         ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
             "Before file slice should exist");
         FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
-        MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath, 
fileSlice, maxCompactionMemoryInBytes);
-        return new RemoveBaseFileIterator(tableState, 
getFileSliceIterator(inputSplit));
-      case AS_IS:
-        HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getTableSchema()));
+        MergeOnReadInputSplit inputSplit = 
CdcIterators.fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes);
+        return new CdcIterators.RemoveBaseFileIterator(
+            tableState.getRequiredRowType(), 
tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
+      case AS_IS: {
+        HoodieSchema tblSchema = 
HoodieSchema.parse(tableState.getTableSchema());
+        HoodieSchema reqSchema = 
HoodieSchema.parse(tableState.getRequiredSchema());
+        HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(tblSchema);
         HoodieSchema cdcSchema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
         switch (mode) {
           case DATA_BEFORE_AFTER:
-            return new BeforeAfterImageIterator(tablePath, tableState, 
hadoopConf, cdcSchema, fileSplit);
+            return new CdcIterators.BeforeAfterImageIterator(
+                hadoopConf, tablePath, tblSchema, reqSchema, 
tableState.getRequiredRowType(), cdcSchema, fileSplit);
           case DATA_BEFORE:
-            return new BeforeImageIterator(conf, hadoopConf, tablePath, 
tableState, cdcSchema, fileSplit, imageManager);
+            return new CdcIterators.BeforeImageIterator(
+                hadoopConf, tablePath, tblSchema, reqSchema, 
tableState.getRequiredRowType(),
+                tableState.getRequiredPositions(), maxCompactionMemoryInBytes, 
cdcSchema, fileSplit, imageManager);
           case OP_KEY_ONLY:
-            return new RecordKeyImageIterator(conf, hadoopConf, tablePath, 
tableState, cdcSchema, fileSplit, imageManager);
+            return new CdcIterators.RecordKeyImageIterator(
+                hadoopConf, tablePath, tblSchema, reqSchema, 
tableState.getRequiredRowType(),
+                tableState.getRequiredPositions(), maxCompactionMemoryInBytes, 
cdcSchema, fileSplit, imageManager);
           default:
-            throw new AssertionError("Unexpected mode" + mode);
+            throw new AssertionError("Unexpected mode: " + mode);
         }
+      }
       case LOG_FILE:
         ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
             "CDC file path should exist and be singleton");
         String logFilepath = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
-        MergeOnReadInputSplit split = singleLogFile2Split(tablePath, 
logFilepath, maxCompactionMemoryInBytes);
+        MergeOnReadInputSplit split = 
CdcIterators.singleLogFile2Split(tablePath, logFilepath, 
maxCompactionMemoryInBytes);
         ClosableIterator<HoodieRecord<RowData>> recordIterator = 
getSplitRecordIterator(split);
-        return new DataLogFileIterator(maxCompactionMemoryInBytes, 
imageManager, fileSplit, tableState, recordIterator, metaClient);
+        return new CdcIterators.DataLogFileIterator(
+            maxCompactionMemoryInBytes, imageManager, fileSplit,
+            HoodieSchema.parse(tableState.getTableSchema()),
+            tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+            recordIterator, metaClient, imageManager.getWriteConfig());
       case REPLACE_COMMIT:
-        return new ReplaceCommitIterator(conf, tablePath, tableState, 
fileSplit, this::getFileSliceIterator);
+        return new CdcIterators.ReplaceCommitIterator(
+            tablePath, tableState.getRequiredRowType(), 
tableState.getRequiredPositions(),
+            maxCompactionMemoryInBytes, fileSplit, this::getFileSliceIterator);
       default:
         throw new AssertionError("Unexpected cdc file split infer case: " + 
fileSplit.getCdcInferCase());
     }
@@ -222,656 +186,6 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
     return fileGroupReader.getClosableHoodieRecordIterator();
   }
 
-  // -------------------------------------------------------------------------
-  //  Inner Class
-  // -------------------------------------------------------------------------
-  static class CdcFileSplitsIterator implements ClosableIterator<RowData> {
-    private ImageManager imageManager; //  keep a reference to release resource
-    private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
-    private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc;
-    private ClosableIterator<RowData> recordIterator;
-
-    CdcFileSplitsIterator(
-        CdcInputSplit inputSplit,
-        ImageManager imageManager,
-        Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc) {
-      this.fileSplitIterator = 
Arrays.asList(inputSplit.getChanges()).iterator();
-      this.imageManager = imageManager;
-      this.recordIteratorFunc = recordIteratorFunc;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (recordIterator != null) {
-        if (recordIterator.hasNext()) {
-          return true;
-        } else {
-          recordIterator.close(); // release resource
-          recordIterator = null;
-        }
-      }
-      if (fileSplitIterator.hasNext()) {
-        HoodieCDCFileSplit fileSplit = fileSplitIterator.next();
-        recordIterator = recordIteratorFunc.apply(fileSplit);
-        return recordIterator.hasNext();
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      return recordIterator.next();
-    }
-
-    @Override
-    public void close() {
-      if (recordIterator != null) {
-        recordIterator.close();
-      }
-      if (imageManager != null) {
-        imageManager.close();
-        imageManager = null;
-      }
-    }
-  }
-
-  static class AddBaseFileIterator implements ClosableIterator<RowData> {
-    // base file record iterator
-    private ClosableIterator<RowData> nested;
-
-    private RowData currentRecord;
-
-    AddBaseFileIterator(ClosableIterator<RowData> nested) {
-      this.nested = nested;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (this.nested.hasNext()) {
-        currentRecord = this.nested.next();
-        currentRecord.setRowKind(RowKind.INSERT);
-        return true;
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      return currentRecord;
-    }
-
-    @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
-        this.nested = null;
-      }
-    }
-  }
-
-  static class RemoveBaseFileIterator implements ClosableIterator<RowData> {
-    private ClosableIterator<RowData> nested;
-    private final RowDataProjection projection;
-
-    RemoveBaseFileIterator(MergeOnReadTableState tableState, 
ClosableIterator<RowData> iterator) {
-      this.nested = iterator;
-      this.projection = 
RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
-    }
-
-    @Override
-    public boolean hasNext() {
-      return nested.hasNext();
-    }
-
-    @Override
-    public RowData next() {
-      RowData row = nested.next();
-      row.setRowKind(RowKind.DELETE);
-      return this.projection.project(row);
-    }
-
-    @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
-        this.nested = null;
-      }
-    }
-  }
-
-  // accounting to HoodieCDCInferenceCase.LOG_FILE
-  static class DataLogFileIterator implements ClosableIterator<RowData> {
-    private final HoodieSchema tableSchema;
-    private final long maxCompactionMemoryInBytes;
-    private final ImageManager imageManager;
-    private final RowDataProjection projection;
-    private final BufferedRecordMerger recordMerger;
-    private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
-    private final DeleteContext deleteContext;
-
-    private ExternalSpillableMap<String, byte[]> beforeImages;
-    private RowData currentImage;
-    private RowData sideImage;
-    private HoodieReaderContext<RowData> readerContext;
-    private String[] orderingFields;
-    private TypedProperties props;
-
-    DataLogFileIterator(
-        long maxCompactionMemoryInBytes,
-        ImageManager imageManager,
-        HoodieCDCFileSplit cdcFileSplit,
-        MergeOnReadTableState tableState,
-        ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
-        HoodieTableMetaClient metaClient) throws IOException {
-      this.tableSchema = HoodieSchema.parse(tableState.getTableSchema());
-      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
-      this.imageManager = imageManager;
-      this.projection = 
tableState.getRequiredRowType().equals(tableState.getRowType())
-          ? null
-          : RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
-      HoodieWriteConfig writeConfig = this.imageManager.writeConfig;
-      this.props = writeConfig.getProps();
-      this.readerContext = new 
FlinkReaderContextFactory(metaClient).getContext();
-      readerContext.initRecordMerger(props);
-      this.orderingFields = ConfigUtils.getOrderingFields(props);
-      this.recordMerger = BufferedRecordMergerFactory.create(
-          readerContext,
-          readerContext.getMergeMode(),
-          false,
-          Option.of(imageManager.writeConfig.getRecordMerger()),
-          tableSchema,
-          
Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(), 
writeConfig.getPayloadClass())),
-          props,
-          metaClient.getTableConfig().getPartialUpdateMode());
-      this.logRecordIterator = logRecordIterator;
-      initImages(cdcFileSplit);
-      this.deleteContext = new DeleteContext(props, 
tableSchema).withReaderSchema(tableSchema);
-    }
-
-    private void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
-      // init before images
-      if (fileSplit.getBeforeFileSlice().isPresent() && 
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
-        this.beforeImages = this.imageManager.getOrLoadImages(
-            maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
-      } else {
-        // still initializes an empty map
-        this.beforeImages = 
FormatUtils.spillableMap(this.imageManager.writeConfig, 
maxCompactionMemoryInBytes, getClass().getSimpleName());
-      }
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (this.sideImage != null) {
-        this.currentImage = this.sideImage;
-        this.sideImage = null;
-        return true;
-      }
-      while (logRecordIterator.hasNext()) {
-        HoodieRecord<RowData> record = logRecordIterator.next();
-        RowData existed = 
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
-        if (isDelete(record)) {
-          // it's a deleted record.
-          if (existed != null) {
-            // there is a real record deleted.
-            existed.setRowKind(RowKind.DELETE);
-            this.currentImage = existed;
-            return true;
-          }
-        } else {
-          if (existed == null) {
-            // a new record is inserted.
-            RowData newRow = record.getData();
-            newRow.setRowKind(RowKind.INSERT);
-            this.currentImage = newRow;
-            return true;
-          } else {
-            // an existed record is updated, assuming new record and existing 
record share the same hoodie key
-            HoodieOperation operation = 
HoodieOperation.fromValue(existed.getRowKind().toByteValue());
-            HoodieRecord<RowData> historyRecord = new 
HoodieFlinkRecord(record.getKey(), operation, existed);
-            HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord, 
record).get();
-            if (merged.getData() != existed) {
-              // update happens
-              existed.setRowKind(RowKind.UPDATE_BEFORE);
-              this.currentImage = existed;
-              RowData mergedRow = merged.getData();
-              mergedRow.setRowKind(RowKind.UPDATE_AFTER);
-              this.imageManager.updateImageRecord(record.getRecordKey(), 
beforeImages, mergedRow);
-              this.sideImage = mergedRow;
-
-              return true;
-            }
-          }
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      return this.projection != null ? 
this.projection.project(this.currentImage) : this.currentImage;
-    }
-
-    @Override
-    public void close() {
-      this.logRecordIterator.close();
-      this.imageManager.close();
-    }
-
-    @SuppressWarnings("unchecked")
-    private Option<HoodieRecord<RowData>> 
mergeRowWithLog(HoodieRecord<RowData> historyRecord, HoodieRecord<RowData> 
newRecord) {
-      try {
-        BufferedRecord<RowData> historyBufferedRecord = 
BufferedRecords.fromHoodieRecord(historyRecord, tableSchema, 
readerContext.getRecordContext(), props, orderingFields, deleteContext);
-        BufferedRecord<RowData> newBufferedRecord = 
BufferedRecords.fromHoodieRecord(newRecord, tableSchema, 
readerContext.getRecordContext(), props, orderingFields, deleteContext);
-        BufferedRecord<RowData> mergedRecord = 
recordMerger.finalMerge(historyBufferedRecord, newBufferedRecord);
-        return 
Option.ofNullable(readerContext.getRecordContext().constructHoodieRecord(mergedRecord,
 historyRecord.getPartitionPath()));
-      } catch (IOException e) {
-        throw new HoodieIOException("Merge base and delta payloads exception", 
e);
-      }
-    }
-
-    private boolean isDelete(HoodieRecord<RowData> record) {
-      return record.isDelete(deleteContext, CollectionUtils.emptyProps());
-    }
-  }
-
-  abstract static class BaseImageIterator implements ClosableIterator<RowData> 
{
-    private final HoodieSchema requiredSchema;
-    private final int[] requiredPos;
-    private final GenericRecordBuilder recordBuilder;
-    private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
-
-    // the changelog records iterator
-    private HoodieCDCLogRecordIterator cdcItr;
-
-    private GenericRecord cdcRecord;
-
-    private RowData sideImage;
-
-    private RowData currentImage;
-
-    BaseImageIterator(
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        String tablePath,
-        MergeOnReadTableState tableState,
-        HoodieSchema cdcSchema,
-        HoodieCDCFileSplit fileSplit) {
-      this.requiredSchema = HoodieSchema.parse(tableState.getRequiredSchema());
-      this.requiredPos = getRequiredPos(tableState.getTableSchema(), 
this.requiredSchema);
-      this.recordBuilder = new 
GenericRecordBuilder(requiredSchema.getAvroSchema());
-      this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
-      StoragePath hadoopTablePath = new StoragePath(tablePath);
-      HoodieStorage storage = HoodieStorageUtils.getStorage(tablePath, 
HadoopFSUtils.getStorageConf(hadoopConf));
-      HoodieLogFile[] cdcLogFiles = 
fileSplit.getCdcFiles().stream().map(cdcFile -> {
-        try {
-          return new HoodieLogFile(
-              storage.getPathInfo(new StoragePath(hadoopTablePath, cdcFile)));
-        } catch (IOException e) {
-          throw new HoodieIOException("Fail to call getFileStatus", e);
-        }
-      }).toArray(HoodieLogFile[]::new);
-      this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, 
cdcSchema);
-    }
-
-    private int[] getRequiredPos(String tableSchema, HoodieSchema required) {
-      HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableSchema));
-      List<String> fields = 
dataSchema.getFields().stream().map(HoodieSchemaField::name).collect(Collectors.toList());
-      return required.getFields().stream()
-          .map(f -> fields.indexOf(f.name()))
-          .mapToInt(i -> i)
-          .toArray();
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (this.sideImage != null) {
-        currentImage = this.sideImage;
-        this.sideImage = null;
-        return true;
-      } else if (this.cdcItr.hasNext()) {
-        cdcRecord = (GenericRecord) this.cdcItr.next();
-        String op = String.valueOf(cdcRecord.get(0));
-        resolveImage(op);
-        return true;
-      }
-      return false;
-    }
-
-    protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord 
cdcRecord);
-
-    protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord 
cdcRecord);
-
-    @Override
-    public RowData next() {
-      return currentImage;
-    }
-
-    @Override
-    public void close() {
-      if (this.cdcItr != null) {
-        this.cdcItr.close();
-        this.cdcItr = null;
-      }
-    }
-
-    private void resolveImage(String op) {
-      switch (op) {
-        case "i":
-          currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
-          break;
-        case "u":
-          currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
-          sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
-          break;
-        case "d":
-          currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
-          break;
-        default:
-          throw new AssertionError("Unexpected");
-      }
-    }
-
-    protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
-      GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
-          avroRecord,
-          requiredSchema,
-          requiredPos,
-          recordBuilder);
-      RowData resolved = (RowData) 
avroToRowDataConverter.convert(requiredAvroRecord);
-      resolved.setRowKind(rowKind);
-      return resolved;
-    }
-  }
-
-  // op, ts, before_image, after_image
-  static class BeforeAfterImageIterator extends BaseImageIterator {
-    BeforeAfterImageIterator(
-        String tablePath,
-        MergeOnReadTableState tableState,
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        HoodieSchema cdcSchema,
-        HoodieCDCFileSplit fileSplit) {
-      super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
-    }
-
-    @Override
-    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
-      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
-    }
-
-    @Override
-    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
-      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
-    }
-  }
-
-  // op, key, before_image
-  static class BeforeImageIterator extends BaseImageIterator {
-    protected ExternalSpillableMap<String, byte[]> afterImages;
-
-    protected final long maxCompactionMemoryInBytes;
-
-    protected final RowDataProjection projection;
-    protected final ImageManager imageManager;
-
-    BeforeImageIterator(
-        Configuration flinkConf,
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        String tablePath,
-        MergeOnReadTableState tableState,
-        HoodieSchema cdcSchema,
-        HoodieCDCFileSplit fileSplit,
-        ImageManager imageManager) throws IOException {
-      super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
-      this.maxCompactionMemoryInBytes = 
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf);
-      this.projection = 
RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
-      this.imageManager = imageManager;
-      initImages(fileSplit);
-    }
-
-    protected void initImages(
-        HoodieCDCFileSplit fileSplit) throws IOException {
-      ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
-          "Current file slice does not exist for instant: " + 
fileSplit.getInstant());
-      this.afterImages = this.imageManager.getOrLoadImages(
-          maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
-    }
-
-    @Override
-    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
-      String recordKey = cdcRecord.get(1).toString();
-      RowData row = imageManager.getImageRecord(recordKey, this.afterImages, 
rowKind);
-      row.setRowKind(rowKind);
-      return this.projection.project(row);
-    }
-
-    @Override
-    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
-      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
-    }
-  }
-
-  // op, key
-  static class RecordKeyImageIterator extends BeforeImageIterator {
-    protected ExternalSpillableMap<String, byte[]> beforeImages;
-
-    RecordKeyImageIterator(
-        Configuration flinkConf,
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        String tablePath,
-        MergeOnReadTableState tableState,
-        HoodieSchema cdcSchema,
-        HoodieCDCFileSplit fileSplit,
-        ImageManager imageManager) throws IOException {
-      super(flinkConf, hadoopConf, tablePath, tableState, cdcSchema, 
fileSplit, imageManager);
-    }
-
-    protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException 
{
-      // init after images
-      super.initImages(fileSplit);
-      // init before images
-      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
-          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
-      this.beforeImages = this.imageManager.getOrLoadImages(
-          maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
-    }
-
-    @Override
-    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
-      String recordKey = cdcRecord.get(1).toString();
-      RowData row = this.imageManager.getImageRecord(recordKey, 
this.beforeImages, rowKind);
-      row.setRowKind(rowKind);
-      return this.projection.project(row);
-    }
-  }
-
-  static class ReplaceCommitIterator implements ClosableIterator<RowData> {
-    private final ClosableIterator<RowData> itr;
-    private final RowDataProjection projection;
-
-    ReplaceCommitIterator(
-        Configuration flinkConf,
-        String tablePath,
-        MergeOnReadTableState tableState,
-        HoodieCDCFileSplit fileSplit,
-        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
-      this.itr = initIterator(tablePath, 
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), fileSplit, 
splitIteratorFunc);
-      this.projection = 
RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
-    }
-
-    private ClosableIterator<RowData> initIterator(
-        String tablePath,
-        long maxCompactionMemoryInBytes,
-        HoodieCDCFileSplit fileSplit,
-        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
-      // init before images
-
-      // the before file slice must exist,
-      // see HoodieCDCExtractor#extractCDCFileSplits for details
-      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
-          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
-      MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
-          tablePath, fileSplit.getBeforeFileSlice().get(), 
maxCompactionMemoryInBytes);
-      return splitIteratorFunc.apply(inputSplit);
-    }
-
-    @Override
-    public boolean hasNext() {
-      return this.itr.hasNext();
-    }
-
-    @Override
-    public RowData next() {
-      RowData row = this.itr.next();
-      row.setRowKind(RowKind.DELETE);
-      return this.projection.project(row);
-    }
-
-    @Override
-    public void close() {
-      this.itr.close();
-    }
-  }
-
-  public static final class BytesArrayInputView extends DataInputStream 
implements DataInputView {
-    public BytesArrayInputView(byte[] data) {
-      super(new ByteArrayInputStream(data));
-    }
-
-    public void skipBytesToRead(int numBytes) throws IOException {
-      while (numBytes > 0) {
-        int skipped = this.skipBytes(numBytes);
-        numBytes -= skipped;
-      }
-    }
-  }
-
-  public static final class BytesArrayOutputView extends DataOutputStream 
implements DataOutputView {
-    public BytesArrayOutputView(ByteArrayOutputStream baos) {
-      super(baos);
-    }
-
-    public void skipBytesToWrite(int numBytes) throws IOException {
-      for (int i = 0; i < numBytes; ++i) {
-        this.write(0);
-      }
-    }
-
-    public void write(DataInputView source, int numBytes) throws IOException {
-      byte[] buffer = new byte[numBytes];
-      source.readFully(buffer);
-      this.write(buffer);
-    }
-  }
-
-  /**
-   * A before/after image manager
-   * that caches the image records by versions(file slices).
-   */
-  private static class ImageManager implements AutoCloseable {
-    private final HoodieWriteConfig writeConfig;
-
-    private final RowDataSerializer serializer;
-    private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc;
-
-    private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
-
-    public ImageManager(
-        Configuration flinkConf,
-        RowType rowType,
-        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
-      this.serializer = new RowDataSerializer(rowType);
-      this.splitIteratorFunc = splitIteratorFunc;
-      this.cache = new TreeMap<>();
-      this.writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf);
-    }
-
-    public ExternalSpillableMap<String, byte[]> getOrLoadImages(
-        long maxCompactionMemoryInBytes,
-        FileSlice fileSlice) throws IOException {
-      final String instant = fileSlice.getBaseInstantTime();
-      if (this.cache.containsKey(instant)) {
-        return cache.get(instant);
-      }
-      // clean the earliest file slice first
-      if (this.cache.size() > 1) {
-        // keep at most 2 versions: before & after
-        String instantToClean = this.cache.keySet().iterator().next();
-        this.cache.remove(instantToClean).close();
-      }
-      ExternalSpillableMap<String, byte[]> images = 
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
-      this.cache.put(instant, images);
-      return images;
-    }
-
-    private ExternalSpillableMap<String, byte[]> loadImageRecords(
-        long maxCompactionMemoryInBytes,
-        FileSlice fileSlice) throws IOException {
-      MergeOnReadInputSplit inputSplit = 
CdcInputFormat.fileSlice2Split(writeConfig.getBasePath(), fileSlice, 
maxCompactionMemoryInBytes);
-      // initialize the image records map
-      ExternalSpillableMap<String, byte[]> imageRecordsMap =
-          FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, 
getClass().getSimpleName());
-      try (ClosableIterator<RowData> itr = 
splitIteratorFunc.apply(inputSplit)) {
-        while (itr.hasNext()) {
-          RowData row = itr.next();
-          String recordKey = 
row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
-          ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-          serializer.serialize(row, new BytesArrayOutputView(baos));
-          imageRecordsMap.put(recordKey, baos.toByteArray());
-        }
-      }
-      return imageRecordsMap;
-    }
-
-    public RowData getImageRecord(
-        String recordKey,
-        ExternalSpillableMap<String, byte[]> cache,
-        RowKind rowKind) {
-      byte[] bytes = cache.get(recordKey);
-      ValidationUtils.checkState(bytes != null,
-          "Key " + recordKey + " does not exist in current file group");
-      try {
-        RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
-        row.setRowKind(rowKind);
-        return row;
-      } catch (IOException e) {
-        throw new HoodieException("Deserialize bytes into row data exception", 
e);
-      }
-    }
-
-    public void updateImageRecord(
-        String recordKey,
-        ExternalSpillableMap<String, byte[]> cache,
-        RowData row) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-      try {
-        serializer.serialize(row, new BytesArrayOutputView(baos));
-      } catch (IOException e) {
-        throw new HoodieException("Serialize row data into bytes exception", 
e);
-      }
-      cache.put(recordKey, baos.toByteArray());
-    }
-
-    public RowData removeImageRecord(
-        String recordKey,
-        ExternalSpillableMap<String, byte[]> cache) {
-      byte[] bytes = cache.remove(recordKey);
-      if (bytes == null) {
-        return null;
-      }
-      try {
-        return serializer.deserialize(new BytesArrayInputView(bytes));
-      } catch (IOException e) {
-        throw new HoodieException("Deserialize bytes into row data exception", 
e);
-      }
-    }
-
-    @Override
-    public void close() {
-      this.cache.values().forEach(ExternalSpillableMap::close);
-      this.cache.clear();
-    }
-  }
-
   /**
    * Builder for {@link CdcInputFormat}.
    */
@@ -911,30 +225,4 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
       return new CdcInputFormat(conf, tableState, fieldTypes, predicates, 
limit, emitDelete);
     }
   }
-
-  // -------------------------------------------------------------------------
-  //  Utilities
-  // -------------------------------------------------------------------------
-  public static MergeOnReadInputSplit fileSlice2Split(
-      String tablePath,
-      FileSlice fileSlice,
-      long maxCompactionMemoryInBytes) {
-    Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
-        .sorted(HoodieLogFile.getLogFileComparator())
-        .map(logFile -> logFile.getPath().toString())
-        // filter out the cdc logs
-        .filter(path -> !path.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-        .collect(Collectors.toList()));
-    String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
-    return new MergeOnReadInputSplit(0, basePath, logPaths, 
fileSlice.getLatestInstantTime(),
-        tablePath, maxCompactionMemoryInBytes, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
-        fileSlice.getFileId(), fileSlice.getPartitionPath());
-  }
-
-  public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, 
String filePath, long maxCompactionMemoryInBytes) {
-    return new MergeOnReadInputSplit(0, null, 
Option.of(Collections.singletonList(filePath)),
-        FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), 
tablePath, maxCompactionMemoryInBytes,
-        FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, 
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)),
-        FSUtils.getRelativePartitionPath(new StoragePath(tablePath), new 
StoragePath(filePath).getParent()));
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
new file mode 100644
index 000000000000..5f4781494217
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Shared iterator implementations for CDC record reading, used by both
+ * {@link CdcInputFormat} and the Source V2 CDC split reader.
+ */
+public final class CdcIterators {
+
+  private CdcIterators() {
+  }
+
+  // -------------------------------------------------------------------------
+  //  Top-level iterator: fans out over an ordered list of CDC file-splits
+  // -------------------------------------------------------------------------
+
+  /**
+   * Iterates over an ordered sequence of {@link HoodieCDCFileSplit}s, 
delegating
+   * per-split record reading to a user-supplied factory function.
+   */
+  public static class CdcFileSplitsIterator implements 
ClosableIterator<RowData> {
+    private CdcImageManager imageManager;
+    private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
+    private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc;
+    private ClosableIterator<RowData> recordIterator;
+
+    public CdcFileSplitsIterator(
+        HoodieCDCFileSplit[] changes,
+        CdcImageManager imageManager,
+        Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc) {
+      this.fileSplitIterator = Arrays.asList(changes).iterator();
+      this.imageManager = imageManager;
+      this.recordIteratorFunc = recordIteratorFunc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (recordIterator != null) {
+        if (recordIterator.hasNext()) {
+          return true;
+        } else {
+          recordIterator.close();
+          recordIterator = null;
+        }
+      }
+      if (fileSplitIterator.hasNext()) {
+        recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
+        return recordIterator.hasNext();
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return recordIterator.next();
+    }
+
+    @Override
+    public void close() {
+      if (recordIterator != null) {
+        recordIterator.close();
+      }
+      if (imageManager != null) {
+        imageManager.close();
+        imageManager = null;
+      }
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  BASE_FILE_INSERT / BASE_FILE_DELETE
+  // -------------------------------------------------------------------------
+
+  /**
+   * Wraps a base-file parquet iterator and marks every record as {@link 
RowKind#INSERT}.
+   * Used for the {@code BASE_FILE_INSERT} CDC inference case.
+   */
+  public static class AddBaseFileIterator implements ClosableIterator<RowData> 
{
+    private ClosableIterator<RowData> nested;
+    private RowData currentRecord;
+
+    public AddBaseFileIterator(ClosableIterator<RowData> nested) {
+      this.nested = nested;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (nested.hasNext()) {
+        currentRecord = nested.next();
+        currentRecord.setRowKind(RowKind.INSERT);
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return currentRecord;
+    }
+
+    @Override
+    public void close() {
+      if (nested != null) {
+        nested.close();
+        nested = null;
+      }
+    }
+  }
+
+  /**
+   * Wraps a file-slice iterator and marks every record as {@link 
RowKind#DELETE}, applying
+   * required-column projection. Used for the {@code BASE_FILE_DELETE} CDC 
inference case.
+   */
+  public static class RemoveBaseFileIterator implements 
ClosableIterator<RowData> {
+    private ClosableIterator<RowData> nested;
+    private final RowDataProjection projection;
+
+    public RemoveBaseFileIterator(
+        RowType requiredRowType,
+        int[] requiredPositions,
+        ClosableIterator<RowData> iterator) {
+      this.nested = iterator;
+      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPositions);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nested.hasNext();
+    }
+
+    @Override
+    public RowData next() {
+      RowData row = nested.next();
+      row.setRowKind(RowKind.DELETE);
+      return projection.project(row);
+    }
+
+    @Override
+    public void close() {
+      if (nested != null) {
+        nested.close();
+        nested = null;
+      }
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  LOG_FILE
+  // -------------------------------------------------------------------------
+
+  /**
+   * Handles the {@code LOG_FILE} CDC inference case: compares records from 
the log file against
+   * before-image snapshots to emit INSERT / UPDATE_BEFORE / UPDATE_AFTER / 
DELETE events.
+   */
+  public static class DataLogFileIterator implements ClosableIterator<RowData> 
{
+    private final HoodieSchema tableSchema;
+    private final long maxCompactionMemoryInBytes;
+    private final CdcImageManager imageManager;
+    private final RowDataProjection projection;
+    private final BufferedRecordMerger recordMerger;
+    private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
+    private final DeleteContext deleteContext;
+    private final HoodieReaderContext<RowData> readerContext;
+    private final String[] orderingFields;
+    private final TypedProperties props;
+
+    private ExternalSpillableMap<String, byte[]> beforeImages;
+    private RowData currentImage;
+    private RowData sideImage;
+
+    public DataLogFileIterator(
+        long maxCompactionMemoryInBytes,
+        CdcImageManager imageManager,
+        HoodieCDCFileSplit cdcFileSplit,
+        HoodieSchema tableSchema,
+        RowType requiredRowType,
+        int[] requiredPositions,
+        ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
+        HoodieTableMetaClient metaClient,
+        HoodieWriteConfig writeConfig) throws IOException {
+      this.tableSchema = tableSchema;
+      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+      this.imageManager = imageManager;
+      this.projection = 
HoodieSchemaConverter.convertToRowType(tableSchema).equals(requiredRowType)
+          ? null : RowDataProjection.instance(requiredRowType, 
requiredPositions);
+      this.props = writeConfig.getProps();
+      this.readerContext = new 
FlinkReaderContextFactory(metaClient).getContext();
+      readerContext.initRecordMerger(props);
+      this.orderingFields = ConfigUtils.getOrderingFields(props);
+      this.recordMerger = BufferedRecordMergerFactory.create(
+          readerContext,
+          readerContext.getMergeMode(),
+          false,
+          Option.of(writeConfig.getRecordMerger()),
+          tableSchema,
+          
Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(), 
writeConfig.getPayloadClass())),
+          props,
+          metaClient.getTableConfig().getPartialUpdateMode());
+      this.logRecordIterator = logRecordIterator;
+      this.deleteContext = new DeleteContext(props, 
tableSchema).withReaderSchema(tableSchema);
+      initImages(cdcFileSplit, writeConfig);
+    }
+
+    private void initImages(HoodieCDCFileSplit fileSplit, HoodieWriteConfig 
writeConfig) throws IOException {
+      if (fileSplit.getBeforeFileSlice().isPresent() && 
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
+        this.beforeImages = imageManager.getOrLoadImages(
+            maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+      } else {
+        this.beforeImages = FormatUtils.spillableMap(writeConfig, 
maxCompactionMemoryInBytes, getClass().getSimpleName());
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (sideImage != null) {
+        currentImage = sideImage;
+        sideImage = null;
+        return true;
+      }
+      while (logRecordIterator.hasNext()) {
+        HoodieRecord<RowData> record = logRecordIterator.next();
+        RowData existed = 
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
+        if (isDelete(record)) {
+          if (existed != null) {
+            existed.setRowKind(RowKind.DELETE);
+            currentImage = existed;
+            return true;
+          }
+        } else {
+          if (existed == null) {
+            RowData newRow = record.getData();
+            newRow.setRowKind(RowKind.INSERT);
+            currentImage = newRow;
+            return true;
+          } else {
+            HoodieOperation operation = 
HoodieOperation.fromValue(existed.getRowKind().toByteValue());
+            HoodieRecord<RowData> historyRecord = new 
HoodieFlinkRecord(record.getKey(), operation, existed);
+            HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord, 
record).get();
+            if (merged.getData() != existed) {
+              existed.setRowKind(RowKind.UPDATE_BEFORE);
+              currentImage = existed;
+              RowData mergedRow = merged.getData();
+              mergedRow.setRowKind(RowKind.UPDATE_AFTER);
+              imageManager.updateImageRecord(record.getRecordKey(), 
beforeImages, mergedRow);
+              sideImage = mergedRow;
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public RowData next() {
+      return projection != null ? projection.project(currentImage) : 
currentImage;
+    }
+
+    @Override
+    public void close() {
+      logRecordIterator.close();
+      imageManager.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    private Option<HoodieRecord<RowData>> mergeRowWithLog(
+        HoodieRecord<RowData> historyRecord, HoodieRecord<RowData> newRecord) {
+      try {
+        BufferedRecord<RowData> histBuf = BufferedRecords.fromHoodieRecord(
+            historyRecord, tableSchema, readerContext.getRecordContext(), 
props, orderingFields, deleteContext);
+        BufferedRecord<RowData> newBuf = BufferedRecords.fromHoodieRecord(
+            newRecord, tableSchema, readerContext.getRecordContext(), props, 
orderingFields, deleteContext);
+        BufferedRecord<RowData> merged = recordMerger.finalMerge(histBuf, 
newBuf);
+        return Option.ofNullable(readerContext.getRecordContext()
+            .constructHoodieRecord(merged, historyRecord.getPartitionPath()));
+      } catch (IOException e) {
+        throw new HoodieIOException("Merge base and delta payloads exception", 
e);
+      }
+    }
+
+    private boolean isDelete(HoodieRecord<RowData> record) {
+      return record.isDelete(deleteContext, CollectionUtils.emptyProps());
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  AS_IS — supplemental logging modes
+  // -------------------------------------------------------------------------
+
+  /**
+   * Base iterator for CDC log files stored with supplemental logging (AS_IS 
inference case).
+   * Reads a {@link HoodieCDCLogRecordIterator} and resolves before/after 
images using
+   * subclass-specific logic.
+   */
+  public abstract static class BaseImageIterator implements 
ClosableIterator<RowData> {
+    private final HoodieSchema requiredSchema;
+    private final int[] requiredPos;
+    private final GenericRecordBuilder recordBuilder;
+    private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+    private HoodieCDCLogRecordIterator cdcItr;
+
+    private GenericRecord cdcRecord;
+    private RowData sideImage;
+    private RowData currentImage;
+
+    protected BaseImageIterator(
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        HoodieSchema tableSchema,
+        HoodieSchema requiredSchema,
+        RowType requiredRowType,
+        HoodieSchema cdcSchema,
+        HoodieCDCFileSplit fileSplit) {
+      this.requiredSchema = requiredSchema;
+      this.requiredPos = computeRequiredPos(tableSchema, requiredSchema);
+      this.recordBuilder = new 
GenericRecordBuilder(requiredSchema.getAvroSchema());
+      this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
+
+      StoragePath hadoopTablePath = new StoragePath(tablePath);
+      HoodieStorage storage = HoodieStorageUtils.getStorage(
+          tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
+      HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream()
+          .map(cdcFile -> {
+            try {
+              return new HoodieLogFile(storage.getPathInfo(new 
StoragePath(hadoopTablePath, cdcFile)));
+            } catch (IOException e) {
+              throw new HoodieIOException("Failed to get file status for CDC 
log: " + cdcFile, e);
+            }
+          })
+          .toArray(HoodieLogFile[]::new);
+      this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, 
cdcSchema);
+    }
+
+    private static int[] computeRequiredPos(HoodieSchema tableSchema, 
HoodieSchema requiredSchema) {
+      HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(tableSchema);
+      List<String> fields = dataSchema.getFields().stream()
+          .map(HoodieSchemaField::name)
+          .collect(Collectors.toList());
+      return requiredSchema.getFields().stream()
+          .map(f -> fields.indexOf(f.name()))
+          .mapToInt(i -> i)
+          .toArray();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (sideImage != null) {
+        currentImage = sideImage;
+        sideImage = null;
+        return true;
+      } else if (cdcItr.hasNext()) {
+        cdcRecord = (GenericRecord) cdcItr.next();
+        String op = String.valueOf(cdcRecord.get(0));
+        resolveImage(op);
+        return true;
+      }
+      return false;
+    }
+
+    protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord 
cdcRecord);
+
+    protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord 
cdcRecord);
+
+    @Override
+    public RowData next() {
+      return currentImage;
+    }
+
+    @Override
+    public void close() {
+      if (cdcItr != null) {
+        cdcItr.close();
+        cdcItr = null;
+      }
+    }
+
+    private void resolveImage(String op) {
+      switch (op) {
+        case "i":
+          currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
+          break;
+        case "u":
+          currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
+          sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
+          break;
+        case "d":
+          currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
+          break;
+        default:
+          throw new AssertionError("Unexpected CDC operation: " + op);
+      }
+    }
+
+    protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
+      GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+          avroRecord, requiredSchema, requiredPos, recordBuilder);
+      RowData resolved = (RowData) 
avroToRowDataConverter.convert(requiredAvroRecord);
+      resolved.setRowKind(rowKind);
+      return resolved;
+    }
+  }
+
+  /**
+   * Reads CDC log files that contain both before and after images ({@code 
DATA_BEFORE_AFTER} mode).
+   * CDC record layout: [op, ts, before_image, after_image].
+   */
+  public static class BeforeAfterImageIterator extends BaseImageIterator {
+    public BeforeAfterImageIterator(
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        HoodieSchema tableSchema,
+        HoodieSchema requiredSchema,
+        RowType requiredRowType,
+        HoodieSchema cdcSchema,
+        HoodieCDCFileSplit fileSplit) {
+      super(hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType, cdcSchema, fileSplit);
+    }
+
+    @Override
+    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+    }
+  }
+
+  /**
+   * Reads CDC log files containing op + key + before_image ({@code 
DATA_BEFORE} mode).
+   * The after-image is loaded from the after file-slice snapshot via {@link 
CdcImageManager}.
+   * CDC record layout: [op, key, before_image].
+   */
+  public static class BeforeImageIterator extends BaseImageIterator {
+    protected ExternalSpillableMap<String, byte[]> afterImages;
+    protected final long maxCompactionMemoryInBytes;
+    protected final RowDataProjection projection;
+    protected final CdcImageManager imageManager;
+
+    public BeforeImageIterator(
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        HoodieSchema tableSchema,
+        HoodieSchema requiredSchema,
+        RowType requiredRowType,
+        int[] requiredPositions,
+        long maxCompactionMemoryInBytes,
+        HoodieSchema cdcSchema,
+        HoodieCDCFileSplit fileSplit,
+        CdcImageManager imageManager) throws IOException {
+      super(hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType, cdcSchema, fileSplit);
+      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPositions);
+      this.imageManager = imageManager;
+      initImages(fileSplit);
+    }
+
+    protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException 
{
+      ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
+          "Current file slice does not exist for instant: " + 
fileSplit.getInstant());
+      this.afterImages = imageManager.getOrLoadImages(
+          maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
+    }
+
+    @Override
+    protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+      String recordKey = cdcRecord.get(1).toString();
+      RowData row = imageManager.getImageRecord(recordKey, afterImages, 
rowKind);
+      row.setRowKind(rowKind);
+      return projection.project(row);
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+    }
+  }
+
+  /**
+   * Reads CDC log files containing only op + key ({@code OP_KEY_ONLY} mode).
+   * Both before and after images are loaded from file-slice snapshots via 
{@link CdcImageManager}.
+   * CDC record layout: [op, key].
+   */
+  public static class RecordKeyImageIterator extends BeforeImageIterator {
+    protected ExternalSpillableMap<String, byte[]> beforeImages;
+
+    public RecordKeyImageIterator(
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        String tablePath,
+        HoodieSchema tableSchema,
+        HoodieSchema requiredSchema,
+        RowType requiredRowType,
+        int[] requiredPositions,
+        long maxCompactionMemoryInBytes,
+        HoodieSchema cdcSchema,
+        HoodieCDCFileSplit fileSplit,
+        CdcImageManager imageManager) throws IOException {
+      super(hadoopConf, tablePath, tableSchema, requiredSchema, 
requiredRowType,
+          requiredPositions, maxCompactionMemoryInBytes, cdcSchema, fileSplit, 
imageManager);
+    }
+
+    @Override
+    protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException 
{
+      super.initImages(fileSplit);
+      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
+      this.beforeImages = imageManager.getOrLoadImages(
+          maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+    }
+
+    @Override
+    protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) 
{
+      String recordKey = cdcRecord.get(1).toString();
+      RowData row = imageManager.getImageRecord(recordKey, beforeImages, 
rowKind);
+      row.setRowKind(rowKind);
+      return projection.project(row);
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  REPLACE_COMMIT
+  // -------------------------------------------------------------------------
+
+  /**
+   * Handles the {@code REPLACE_COMMIT} CDC inference case: emits all records 
from the
+   * before-slice as {@link RowKind#DELETE}.
+   */
+  public static class ReplaceCommitIterator implements 
ClosableIterator<RowData> {
+    private final ClosableIterator<RowData> itr;
+    private final RowDataProjection projection;
+
+    public ReplaceCommitIterator(
+        String tablePath,
+        RowType requiredRowType,
+        int[] requiredPositions,
+        long maxCompactionMemoryInBytes,
+        HoodieCDCFileSplit fileSplit,
+        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
+      ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+          "Before file slice does not exist for instant: " + 
fileSplit.getInstant());
+      MergeOnReadInputSplit inputSplit = fileSlice2Split(
+          tablePath, fileSplit.getBeforeFileSlice().get(), 
maxCompactionMemoryInBytes);
+      this.itr = splitIteratorFunc.apply(inputSplit);
+      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPositions);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return itr.hasNext();
+    }
+
+    @Override
+    public RowData next() {
+      RowData row = itr.next();
+      row.setRowKind(RowKind.DELETE);
+      return projection.project(row);
+    }
+
+    @Override
+    public void close() {
+      itr.close();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  public static MergeOnReadInputSplit fileSlice2Split(
+          String tablePath,
+          FileSlice fileSlice,
+          long maxCompactionMemoryInBytes) {
+    Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+            .sorted(HoodieLogFile.getLogFileComparator())
+            .map(logFile -> logFile.getPath().toString())
+            // filter out the cdc logs
+            .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+            .collect(Collectors.toList()));
+    String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+    return new MergeOnReadInputSplit(0, basePath, logPaths, 
fileSlice.getLatestInstantTime(),
+            tablePath, maxCompactionMemoryInBytes, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
+            fileSlice.getFileId(), fileSlice.getPartitionPath());
+  }
+
+  public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, 
String filePath, long maxCompactionMemoryInBytes) {
+    return new MergeOnReadInputSplit(0, null, 
Option.of(Collections.singletonList(filePath)),
+            FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), 
tablePath, maxCompactionMemoryInBytes,
+            FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, 
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)),
+            FSUtils.getRelativePartitionPath(new StoragePath(tablePath), new 
StoragePath(filePath).getParent()));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
index d0de3f930d37..b0d0579da084 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
@@ -237,13 +237,7 @@ public class TestHoodieCdcSplitReaderFunction {
         1, tempDir.getAbsolutePath(), 128 * 1024 * 1024L, "file-cdc",
         EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
 
-    // The call should NOT throw IllegalArgumentException (type guard passes).
-    // It will throw some other exception when trying to do real I/O.
-    Exception ex = assertThrows(Exception.class, () -> 
function.read(cdcSplit));
-    assertNotNull(ex);
-    // Must not be an IllegalArgumentException (which the type guard throws)
-    if (ex instanceof IllegalArgumentException) {
-      throw new AssertionError("read() should not throw 
IllegalArgumentException for a CdcSourceSplit", ex);
-    }
+    // Should not throw exception
+    function.read(cdcSplit);
   }
 }

Reply via email to