This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b36ba43fdee [HUDI-9146][part2] Integrating FileGroup reader into Flink 
merge reader (#13343)
b36ba43fdee is described below

commit b36ba43fdeec253e9a355b734709fc908ffb62c7
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri May 23 22:26:18 2025 +0800

    [HUDI-9146][part2] Integrating FileGroup reader into Flink merge reader 
(#13343)
---
 .../hudi/BaseSparkInternalRowReaderContext.java    |  10 +
 .../SparkFileFormatInternalRowReaderContext.scala  |   2 +-
 .../apache/hudi/avro/HoodieAvroReaderContext.java  |  10 +
 .../hudi/common/engine/HoodieReaderContext.java    |  53 ++++
 .../table/read/FileGroupReaderSchemaHandler.java   |  41 ++-
 .../common/table/read/FileGroupRecordBuffer.java   |  34 ++-
 .../common/table/read/HoodieFileGroupReader.java   |  30 +-
 .../table/read/KeyBasedFileGroupRecordBuffer.java  |   7 +-
 .../read/PositionBasedFileGroupRecordBuffer.java   |   7 +-
 .../table/read/UnmergedFileGroupRecordBuffer.java  |   5 +-
 ...rIterator.java => CloseableFilterIterator.java} |  37 +--
 .../common/util/collection/FilterIterator.java     |   2 +-
 .../table/read/TestFileGroupRecordBuffer.java      |  10 +-
 .../examples/quickstart/TestQuickstartData.java    |   1 +
 .../apache/hudi/sink/compact/CompactOperator.java  |   2 +-
 .../table/format/FlinkRowDataReaderContext.java    |  72 ++++-
 .../org/apache/hudi/table/format/FormatUtils.java  |   2 +
 .../table/format/HoodieRowDataParquetReader.java   |  14 +-
 .../apache/hudi/table/format/RecordIterators.java  |   6 +-
 .../hudi/table/format/cdc/CdcInputFormat.java      |   2 +-
 .../table/format/mor/MergeOnReadInputFormat.java   | 328 ++-------------------
 .../table/format/mor/MergeOnReadTableState.java    |  27 --
 .../apache/hudi/util/StringToRowDataConverter.java |  21 +-
 .../table/TestHoodieFileGroupReaderOnFlink.java    |  11 +-
 .../apache/hudi/table/format/TestInputFormat.java  |  25 ++
 .../test/java/org/apache/hudi/utils/TestData.java  |   1 +
 .../hudi/utils/TestStringToRowDataConverter.java   |  20 +-
 .../hudi/hadoop/HiveHoodieReaderContext.java       |  10 +
 .../TestPositionBasedFileGroupRecordBuffer.java    |   3 +-
 29 files changed, 355 insertions(+), 438 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 0084fb93b3c..56ebbc89497 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -90,6 +90,11 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
     return getFieldValueFromInternalRow(row, schema, fieldName);
   }
 
+  @Override
+  public String getMetaFieldValue(InternalRow record, int pos) {
+    return record.getString(pos);
+  }
+
   @Override
   public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
     HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
@@ -162,4 +167,9 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
     }
     return value;
   }
+
+  @Override
+  public InternalRow getDeleteRow(InternalRow record, String recordKey) {
+    throw new UnsupportedOperationException("Not supported for " + 
this.getClass().getSimpleName());
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index ec9efab469a..1a08c69cd4b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.isNullable
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableConfig
 import 
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.collection.{CachingIterator, 
ClosableIterator, Pair => HPair}
@@ -35,7 +36,6 @@ import org.apache.hudi.util.CloseableInternalRowIterator
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index ef100f1c718..c15d2b618cc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -98,6 +98,11 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
     return (GenericRecord) record;
   }
 
+  @Override
+  public IndexedRecord getDeleteRow(IndexedRecord record, String recordKey) {
+    throw new UnsupportedOperationException("Not supported for " + 
this.getClass().getSimpleName());
+  }
+
   @Override
   public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, 
String mergeStrategyId, String mergeImplClasses) {
     switch (mergeMode) {
@@ -121,6 +126,11 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
     return getFieldValueFromIndexedRecord(record, schema, fieldName);
   }
 
+  @Override
+  public String getMetaFieldValue(IndexedRecord record, int pos) {
+    return record.get(pos).toString();
+  }
+
   @Override
   public HoodieRecord<IndexedRecord> 
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
     if (bufferedRecord.isDelete()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 638c58aa4d1..61f2deaaadf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,11 +23,13 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableFilterIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.storage.HoodieStorage;
@@ -46,6 +48,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
+import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 
 import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
@@ -74,6 +77,7 @@ public abstract class HoodieReaderContext<T> {
   private Boolean needsBootstrapMerge = null;
   private Boolean shouldMergeUseRecordPosition = null;
   protected String partitionPath;
+  protected Option<InstantRange> instantRangeOpt = Option.empty();
 
   // for encoding and decoding schemas to the spillable map
   private final LocalAvroSchemaCache localAvroSchemaCache = 
LocalAvroSchemaCache.getInstance();
@@ -208,6 +212,23 @@ public abstract class HoodieReaderContext<T> {
   public abstract T convertAvroRecord(IndexedRecord avroRecord);
 
   public abstract GenericRecord convertToAvroRecord(T record, Schema schema);
+
+  /**
+   * There are two cases to handle:
+   * 1). Return the delete record if it's not null;
+   * 2). otherwise fills an empty row with record key fields and returns.
+   *
+   * <p>For case2, when `emitDelete` is true for FileGroup reader and payload 
for DELETE record is empty,
+   * a record key row is emitted to downstream to delete data from storage by 
record key with the best effort.
+   * Returns null if the primary key semantics been lost: the requested schema 
does not include all the record key fields.
+   *
+   * @param record    delete record
+   * @param recordKey record key
+   *
+   * @return Engine specific row which contains record key fields.
+   */
+  @Nullable
+  public abstract T getDeleteRow(T record, String recordKey);
   
   /**
    * @param mergeMode        record merge mode
@@ -228,6 +249,16 @@ public abstract class HoodieReaderContext<T> {
    */
   public abstract Object getValue(T record, Schema schema, String fieldName);
 
+  /**
+   * Get value of metadata field in a more efficient way than #getValue.
+   *
+   * @param record The record in engine-specific type.
+   * @param pos    The position of the metadata field.
+   *
+   * @return The value for the target metadata field.
+   */
+  public abstract String getMetaFieldValue(T record, int pos);
+
   /**
    * Cast to Java boolean value.
    * If the object is not compatible with boolean type, throws.
@@ -241,6 +272,28 @@ public abstract class HoodieReaderContext<T> {
     }
   }
 
+  /**
+   * Get the {@link InstantRange} filter.
+   */
+  public Option<InstantRange> getInstantRange() {
+    return instantRangeOpt;
+  }
+
+  /**
+   * Apply the {@link InstantRange} filter to the file record iterator.
+   *
+   * @param fileRecordIterator File record iterator.
+   *
+   * @return File record iterator filter by {@link InstantRange}.
+   */
+  public ClosableIterator<T> applyInstantRangeFilter(ClosableIterator<T> 
fileRecordIterator) {
+    InstantRange instantRange = getInstantRange().get();
+    final Schema.Field commitTimeField = 
schemaHandler.getRequiredSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+    final int commitTimePos = commitTimeField.pos();
+    Predicate<T> instantFilter = row -> 
instantRange.isInRange(getMetaFieldValue(row, commitTimePos));
+    return new CloseableFilterIterator<>(fileRecordIterator, instantFilter);
+  }
+
   /**
    * Gets the record key in String.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index e39dbf3cd52..816ec21a9fb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -90,6 +90,7 @@ public class FileGroupReaderSchemaHandler<T> {
 
   private final Option<Pair<String, String>> customDeleteMarkerKeyValue;
   private final boolean hasBuiltInDelete;
+  private final int hoodieOperationPos;
 
   public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
                                       Schema tableSchema,
@@ -109,6 +110,7 @@ public class FileGroupReaderSchemaHandler<T> {
     this.customDeleteMarkerKeyValue = deleteConfigs.getLeft();
     this.hasBuiltInDelete = deleteConfigs.getRight();
     this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema());
+    this.hoodieOperationPos = 
Option.ofNullable(requiredSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)).map(Schema.Field::pos).orElse(-1);
     this.internalSchema = pruneInternalSchema(requiredSchema, 
internalSchemaOpt);
     this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
     readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
@@ -150,6 +152,10 @@ public class FileGroupReaderSchemaHandler<T> {
     return hasBuiltInDelete;
   }
 
+  public int getHoodieOperationPos() {
+    return hoodieOperationPos;
+  }
+
   private InternalSchema pruneInternalSchema(Schema requiredSchema, 
Option<InternalSchema> internalSchemaOption) {
     if (!internalSchemaOption.isPresent()) {
       return InternalSchema.getEmptyInternalSchema();
@@ -172,8 +178,13 @@ public class FileGroupReaderSchemaHandler<T> {
 
   @VisibleForTesting
   Schema generateRequiredSchema() {
-    //might need to change this if other queries than mor have mandatory fields
+    boolean hasInstantRange = readerContext.getInstantRange().isPresent();
     if (!needsMORMerge) {
+      if (hasInstantRange && !findNestedField(requestedSchema, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD).isPresent()) {
+        List<Schema.Field> addedFields = new ArrayList<>();
+        addedFields.add(getField(tableSchema, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+        return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
+      }
       return requestedSchema;
     }
 
@@ -186,14 +197,9 @@ public class FileGroupReaderSchemaHandler<T> {
     List<Schema.Field> addedFields = new ArrayList<>();
     for (String field : getMandatoryFieldsForMerging(
         hoodieTableConfig, properties, tableSchema, recordMerger,
-        hasBuiltInDelete, customDeleteMarkerKeyValue)) {
+        hasBuiltInDelete, customDeleteMarkerKeyValue, hasInstantRange)) {
       if (!findNestedField(requestedSchema, field).isPresent()) {
-        Option<Schema.Field> foundFieldOpt = findNestedField(tableSchema, 
field);
-        if (!foundFieldOpt.isPresent()) {
-          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
-        }
-        Schema.Field foundField = foundFieldOpt.get();
-        addedFields.add(foundField);
+        addedFields.add(getField(tableSchema, field));
       }
     }
 
@@ -209,7 +215,8 @@ public class FileGroupReaderSchemaHandler<T> {
                                                        Schema tableSchema,
                                                        
Option<HoodieRecordMerger> recordMerger,
                                                        boolean 
hasBuiltInDelete,
-                                                       Option<Pair<String, 
String>> customDeleteMarkerKeyAndValue) {
+                                                       Option<Pair<String, 
String>> customDeleteMarkerKeyAndValue,
+                                                       boolean 
hasInstantRange) {
     Triple<RecordMergeMode, String, String> mergingConfigs = 
HoodieTableConfig.inferCorrectMergingBehavior(
         cfg.getRecordMergeMode(),
         cfg.getPayloadClass(),
@@ -223,6 +230,11 @@ public class FileGroupReaderSchemaHandler<T> {
 
     // Use Set to avoid duplicated fields.
     Set<String> requiredFields = new HashSet<>();
+
+    if (hasInstantRange) {
+      requiredFields.add(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+    }
+
     // Add record key fields.
     if (cfg.populateMetaFields()) {
       requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
@@ -291,6 +303,17 @@ public class FileGroupReaderSchemaHandler<T> {
     return createNewSchemaFromFieldsWithReference(tableSchema, fields);
   }
 
+  /**
+   * Get {@link Schema.Field} from {@link Schema} by field name.
+   */
+  private static Schema.Field getField(Schema schema, String fieldName) {
+    Option<Schema.Field> foundFieldOpt = findNestedField(schema, fieldName);
+    if (!foundFieldOpt.isPresent()) {
+      throw new IllegalArgumentException("Field: " + fieldName + " does not 
exist in the table schema");
+    }
+    return foundFieldOpt.get();
+  }
+
   /**
    * Fetches the delete configs from the configs.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 0f89a6c426a..020e19cf0fa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -86,6 +86,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   protected final HoodieReadStats readStats;
   protected final boolean shouldCheckCustomDeleteMarker;
   protected final boolean shouldCheckBuiltInDeleteMarker;
+  protected final boolean emitDelete;
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<BufferedRecord<T>> logRecordIterator;
   protected T nextRecord;
@@ -99,7 +100,8 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
                                   RecordMergeMode recordMergeMode,
                                   TypedProperties props,
                                   HoodieReadStats readStats,
-                                  Option<String> orderingFieldName) {
+                                  Option<String> orderingFieldName,
+                                  boolean emitDelete) {
     this.readerContext = readerContext;
     this.readerSchema = 
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
     this.recordMergeMode = recordMergeMode;
@@ -120,6 +122,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
     boolean isBitCaskDiskMapCompressionEnabled = 
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
         DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
     this.readStats = readStats;
+    this.emitDelete = emitDelete;
     try {
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -163,6 +166,18 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
     return columnValue != null && readerContext.castToBoolean(columnValue);
   }
 
+  /**
+   * Returns whether the record is a DELETE marked by the '_hoodie_operation' 
field.
+   */
+  protected final boolean isDeleteHoodieOperation(T record) {
+    int hoodieOperationPos = 
readerContext.getSchemaHandler().getHoodieOperationPos();
+    if (hoodieOperationPos < 0) {
+      return false;
+    }
+    String hoodieOperation = readerContext.getMetaFieldValue(record, 
hoodieOperationPos);
+    return hoodieOperation != null && 
HoodieOperation.isDeleteRecord(hoodieOperation);
+  }
+
   @Override
   public void setBaseFileIterator(ClosableIterator<T> baseFileIterator) {
     this.baseFileIterator = baseFileIterator;
@@ -539,15 +554,20 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
 
   protected boolean hasNextBaseRecord(T baseRecord, BufferedRecord<T> 
logRecordInfo) throws IOException {
     if (logRecordInfo != null) {
-      BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext, 
orderingFieldName, false);
-      Pair<Boolean, T> isDeleteAndRecord = merge(bufferedRecord, 
logRecordInfo);
+      BufferedRecord<T> baseRecordInfo = 
BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext, 
orderingFieldName, false);
+      Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo, 
logRecordInfo);
       if (!isDeleteAndRecord.getLeft()) {
         // Updates
         nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
         readStats.incrementNumUpdates();
         return true;
+      } else if (emitDelete) {
+        // emit Deletes
+        nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(), 
baseRecordInfo.getRecordKey());
+        readStats.incrementNumDeletes();
+        return nextRecord != null;
       } else {
-        // Deletes
+        // not emit Deletes
         readStats.incrementNumDeletes();
         return false;
       }
@@ -570,6 +590,12 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
         nextRecord = nextRecordInfo.getRecord();
         readStats.incrementNumInserts();
         return true;
+      } else if (emitDelete) {
+        nextRecord = readerContext.getDeleteRow(nextRecordInfo.getRecord(), 
nextRecordInfo.getRecordKey());
+        readStats.incrementNumDeletes();
+        if (nextRecord != null) {
+          return true;
+        }
       } else {
         readStats.incrementNumDeletes();
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index ed9e2582716..13dc39af3b1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -112,13 +112,13 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       long start, long length, boolean shouldUseRecordPosition) {
     this(readerContext, storage, tablePath, latestCommitTime, fileSlice, 
dataSchema,
         requestedSchema, internalSchemaOpt, hoodieTableMetaClient, props, 
start, length,
-        shouldUseRecordPosition, false);
+        shouldUseRecordPosition, false, false);
   }
 
   private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, 
HoodieStorage storage, String tablePath,
                                String latestCommitTime, FileSlice fileSlice, 
Schema dataSchema, Schema requestedSchema,
                                Option<InternalSchema> internalSchemaOpt, 
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
-                               long start, long length, boolean 
shouldUseRecordPosition, boolean allowInflightInstants) {
+                               long start, long length, boolean 
shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete) {
     this.readerContext = readerContext;
     this.storage = storage;
     this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -169,7 +169,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.readStats = new HoodieReadStats();
     this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
         recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
-        isSkipMerge, shouldUseRecordPosition, readStats);
+        isSkipMerge, shouldUseRecordPosition, readStats, emitDelete);
     this.allowInflightInstants = allowInflightInstants;
   }
 
@@ -184,18 +184,19 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                                    boolean hasNoLogFiles,
                                                    boolean isSkipMerge,
                                                    boolean 
shouldUseRecordPosition,
-                                                   HoodieReadStats readStats) {
+                                                   HoodieReadStats readStats,
+                                                   boolean emitDelete) {
     if (hasNoLogFiles) {
       return null;
     } else if (isSkipMerge) {
       return new UnmergedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats);
+          readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, emitDelete);
     } else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
       return new PositionBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, 
baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName);
+          readerContext, hoodieTableMetaClient, recordMergeMode, 
baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName, 
emitDelete);
     } else {
       return new KeyBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName);
+          readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName, emitDelete);
     }
   }
 
@@ -224,17 +225,19 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     }
 
     StoragePathInfo baseFileStoragePathInfo = baseFile.getPathInfo();
+    final ClosableIterator<T> recordIterator;
     if (baseFileStoragePathInfo != null) {
-      return readerContext.getFileRecordIterator(
+      recordIterator = readerContext.getFileRecordIterator(
           baseFileStoragePathInfo, start, length,
           readerContext.getSchemaHandler().getTableSchema(),
           readerContext.getSchemaHandler().getRequiredSchema(), storage);
     } else {
-      return readerContext.getFileRecordIterator(
+      recordIterator = readerContext.getFileRecordIterator(
           baseFile.getStoragePath(), start, length,
           readerContext.getSchemaHandler().getTableSchema(),
           readerContext.getSchemaHandler().getRequiredSchema(), storage);
     }
+    return readerContext.getInstantRange().isPresent() ? 
readerContext.applyInstantRangeFilter(recordIterator) : recordIterator;
   }
 
   private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile 
baseFile) throws IOException {
@@ -341,6 +344,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         .withLogFiles(logFiles)
         .withReverseReader(false)
         .withBufferSize(getIntWithAltKeys(props, 
HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
+        .withInstantRange(readerContext.getInstantRange())
         .withPartition(getRelativePartitionPath(
             new StoragePath(path), logFiles.get(0).getPath().getParent()))
         .withRecordBuffer(recordBuffer)
@@ -435,6 +439,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     private long length = Long.MAX_VALUE;
     private boolean shouldUseRecordPosition = false;
     private boolean allowInflightInstants = false;
+    private boolean emitDelete;
 
     public Builder<T> withReaderContext(HoodieReaderContext<T> readerContext) {
       this.readerContext = readerContext;
@@ -497,6 +502,11 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       return this;
     }
 
+    public Builder<T> withEmitDelete(boolean emitDelete) {
+      this.emitDelete = emitDelete;
+      return this;
+    }
+
     public HoodieFileGroupReader<T> build() {
       ValidationUtils.checkArgument(readerContext != null, "Reader context is 
required");
       ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie 
table meta client is required");
@@ -515,7 +525,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       return new HoodieFileGroupReader<>(
           readerContext, storage, tablePath, latestCommitTime, fileSlice,
           dataSchema, requestedSchema, internalSchemaOpt, 
hoodieTableMetaClient,
-          props, start, length, shouldUseRecordPosition, 
allowInflightInstants);
+          props, start, length, shouldUseRecordPosition, 
allowInflightInstants, emitDelete);
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index 2ccc740fb77..5bbce2ad256 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -53,8 +53,9 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
                                        RecordMergeMode recordMergeMode,
                                        TypedProperties props,
                                        HoodieReadStats readStats,
-                                       Option<String> orderingFieldName) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName);
+                                       Option<String> orderingFieldName,
+                                       boolean emitDelete) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName, emitDelete);
   }
 
   @Override
@@ -77,7 +78,7 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
     try (ClosableIterator<T> recordIterator = 
recordsIteratorSchemaPair.getLeft()) {
       while (recordIterator.hasNext()) {
         T nextRecord = recordIterator.next();
-        boolean isDelete = isBuiltInDeleteRecord(nextRecord) || 
isCustomDeleteRecord(nextRecord);
+        boolean isDelete = isBuiltInDeleteRecord(nextRecord) || 
isCustomDeleteRecord(nextRecord) || isDeleteHoodieOperation(nextRecord);
         BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(nextRecord, schema, readerContext, 
orderingFieldName, isDelete);
         processNextDataRecord(bufferedRecord, bufferedRecord.getRecordKey());
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 5df2ea91e07..152b7107b9b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -71,8 +71,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
                                             String baseFileInstantTime,
                                             TypedProperties props,
                                             HoodieReadStats readStats,
-                                            Option<String> orderingFieldName) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName);
+                                            Option<String> orderingFieldName,
+                                            boolean emitDelete) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName, emitDelete);
     this.baseFileInstantTime = baseFileInstantTime;
   }
 
@@ -130,7 +131,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
 
         long recordPosition = recordPositions.get(recordIndex++);
         T evolvedNextRecord = 
schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
-        boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) || 
isCustomDeleteRecord(evolvedNextRecord);
+        boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) || 
isCustomDeleteRecord(evolvedNextRecord) || 
isDeleteHoodieOperation(evolvedNextRecord);
         BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(evolvedNextRecord, schema, readerContext, 
orderingFieldName, isDelete);
         processNextDataRecord(bufferedRecord, recordPosition);
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index c853eb088b7..a9e1f66ef19 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -50,8 +50,9 @@ public class UnmergedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
       HoodieTableMetaClient hoodieTableMetaClient,
       RecordMergeMode recordMergeMode,
       TypedProperties props,
-      HoodieReadStats readStats) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, Option.empty());
+      HoodieReadStats readStats,
+      boolean emitDelete) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, Option.empty(), emitDelete);
     this.currentInstantLogBlocks = new ArrayDeque<>();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
similarity index 52%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
index 4ebc7c10129..c706d847821 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
@@ -18,45 +18,20 @@
 
 package org.apache.hudi.common.util.collection;
 
-import org.apache.hudi.common.util.ValidationUtils;
-
 import java.util.Iterator;
 import java.util.function.Predicate;
 
 /**
- * An iterator that filters elements from a source iterator based on a 
predicate.
- * @param <R> Type of elements in the iterator
+ * {@link FilterIterator} requiring to be closed after iteration (to cleanup 
resources)
  */
-public class FilterIterator<R> implements Iterator<R> {
-
-  private final Iterator<R> source;
-
-  private final Predicate<R> filter;
-
-  private R current;
+public class CloseableFilterIterator<R> extends FilterIterator<R> implements 
ClosableIterator<R> {
 
-  public FilterIterator(Iterator<R> source, Predicate<R> filter) {
-    this.source = source;
-    this.filter = filter;
-  }
-
-  @Override
-  public boolean hasNext() {
-    while (current == null && source.hasNext()) {
-      R next = source.next();
-      if (filter.test(next)) {
-        current = next;
-        break;
-      }
-    }
-    return current != null;
+  public CloseableFilterIterator(Iterator<R> source, Predicate<R> filter) {
+    super(source, filter);
   }
 
   @Override
-  public R next() {
-    ValidationUtils.checkArgument(hasNext(), "No more elements to iterate");
-    R next = current;
-    current = null;
-    return next;
+  public void close() {
+    ((ClosableIterator<R>) source).close();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
index 4ebc7c10129..37b5565ce7e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
@@ -29,7 +29,7 @@ import java.util.function.Predicate;
  */
 public class FilterIterator<R> implements Iterator<R> {
 
-  private final Iterator<R> source;
+  protected final Iterator<R> source;
 
   private final Predicate<R> filter;
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index 06552a9391c..71f0b359999 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -153,6 +153,7 @@ class TestFileGroupRecordBuffer {
                                            HoodieTableVersion tableVersion,
                                            String mergeStrategyId) {
     HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+    when(readerContext.getInstantRange()).thenReturn(Option.empty());
     when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
     when(readerContext.getHasLogFiles()).thenReturn(true);
     HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
@@ -290,7 +291,8 @@ class TestFileGroupRecordBuffer {
             RecordMergeMode.COMMIT_TIME_ORDERING,
             props,
             readStats,
-            Option.empty());
+            Option.empty(),
+            false);
     when(readerContext.getValue(any(), any(), any())).thenReturn(null);
     assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
 
@@ -302,7 +304,8 @@ class TestFileGroupRecordBuffer {
             RecordMergeMode.COMMIT_TIME_ORDERING,
             props,
             readStats,
-            Option.empty());
+            Option.empty(),
+            false);
     when(readerContext.getValue(any(), any(), any())).thenReturn("i");
     assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
     when(readerContext.getValue(any(), any(), any())).thenReturn("d");
@@ -323,7 +326,8 @@ class TestFileGroupRecordBuffer {
             RecordMergeMode.COMMIT_TIME_ORDERING,
             props,
             readStats,
-            Option.empty());
+            Option.empty(),
+            false);
 
     // CASE 1: With custom delete marker.
     GenericRecord record = new GenericData.Record(schema);
diff --git 
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
 
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
index 5e7613f225a..565543eb833 100644
--- 
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
+++ 
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
@@ -342,6 +342,7 @@ public class TestQuickstartData {
   /**
    * Returns the scanner to read avro log files.
    */
+  @Deprecated
   private static HoodieMergedLogRecordScanner getScanner(
       HoodieStorage storage,
       String basePath,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 690ef55e8b8..7159f9a1f32 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -185,7 +185,7 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
       Supplier<InternalSchemaManager> internalSchemaManager = () -> 
InternalSchemaManager.get(conf, metaClient);
       // initialize storage conf lazily.
       StorageConfiguration<?> readerConf = 
writeClient.getEngineContext().getStorageConf();
-      return Option.of(new FlinkRowDataReaderContext(readerConf, 
internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig()));
+      return Option.of(new FlinkRowDataReaderContext(readerConf, 
internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig(), 
Option.empty()));
     } else {
       // always using avro record merger for legacy compaction since log 
scanner do not support rowdata reading yet.
       
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 808860478b3..cfea2d2c64a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -22,7 +22,6 @@ import org.apache.hudi.client.model.BootstrapRowData;
 import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -34,7 +33,9 @@ import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -43,7 +44,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.io.storage.HoodieIOFactory;
-import org.apache.hudi.source.ExpressionPredicates.Predicate;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -52,6 +54,7 @@ import org.apache.hudi.util.RowDataAvroQueryContexts;
 import org.apache.hudi.util.RowDataUtils;
 import org.apache.hudi.util.RowProjection;
 import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
+import org.apache.hudi.util.StringToRowDataConverter;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -65,6 +68,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -79,21 +83,28 @@ import static 
org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
  * log files with Flink parquet reader.
  */
 public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
-  private final List<Predicate> predicates;
+  private final List<ExpressionPredicates.Predicate> predicates;
   private final Supplier<InternalSchemaManager> internalSchemaManager;
   private final boolean utcTimezone;
-  private final HoodieConfig hoodieConfig;
+  private final HoodieTableConfig tableConfig;
+  // the converter is used to create a RowData contains primary key fields only
+  // for DELETE cases, it'll not be initialized if primary key semantics is 
lost.
+  // For e.g, if the pk fields are [a, b] but user only select a, then the pk
+  // semantics is lost.
+  private StringToRowDataConverter recordKeyRowConverter;
 
   public FlinkRowDataReaderContext(
       StorageConfiguration<?> storageConfiguration,
       Supplier<InternalSchemaManager> internalSchemaManager,
-      List<Predicate> predicates,
-      HoodieTableConfig tableConfig) {
+      List<ExpressionPredicates.Predicate> predicates,
+      HoodieTableConfig tableConfig,
+      Option<InstantRange> instantRangeOpt) {
     super(storageConfiguration, tableConfig);
-    this.hoodieConfig = tableConfig;
+    this.tableConfig = tableConfig;
     this.internalSchemaManager = internalSchemaManager;
     this.predicates = predicates;
     this.utcTimezone = 
getStorageConfiguration().getBoolean(FlinkOptions.READ_UTC_TIMEZONE.key(), 
FlinkOptions.READ_UTC_TIMEZONE.defaultValue());
+    this.instantRangeOpt = instantRangeOpt;
   }
 
   @Override
@@ -111,9 +122,32 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
     HoodieRowDataParquetReader rowDataParquetReader =
         (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
             .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
-            .getFileReader(hoodieConfig, filePath, HoodieFileFormat.PARQUET, 
Option.empty());
+            .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, 
Option.empty());
     DataType rowType = 
RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
-    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema);
+    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema, predicates);
+  }
+
+  @Override
+  public void setSchemaHandler(FileGroupReaderSchemaHandler<RowData> 
schemaHandler) {
+    super.setSchemaHandler(schemaHandler);
+
+    Option<String[]> recordKeysOpt = tableConfig.getRecordKeyFields();
+    if (recordKeysOpt.isEmpty()) {
+      return;
+    }
+    // primary key semantic is lost if not all primary key fields are included 
in the request schema.
+    boolean pkSemanticLost = Arrays.stream(recordKeysOpt.get()).anyMatch(k -> 
schemaHandler.getRequestedSchema().getField(k) == null);
+    if (pkSemanticLost) {
+      return;
+    }
+    // get primary key field position in required schema.
+    Schema requiredSchema = schemaHandler.getRequiredSchema();
+    int[] pkFieldsPos = Arrays.stream(recordKeysOpt.get())
+        .map(k -> 
Option.ofNullable(requiredSchema.getField(k)).map(Schema.Field::pos).orElse(-1))
+        .mapToInt(Integer::intValue)
+        .toArray();
+    recordKeyRowConverter = new StringToRowDataConverter(
+        pkFieldsPos, (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(requiredSchema).getRowType().getLogicalType());
   }
 
   @Override
@@ -144,6 +178,11 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
     }
   }
 
+  @Override
+  public String getMetaFieldValue(RowData record, int pos) {
+    return record.getString(pos).toString();
+  }
+
   @Override
   public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData> 
bufferedRecord) {
     HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
@@ -256,6 +295,21 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
     return (GenericRecord) 
RowDataAvroQueryContexts.fromAvroSchema(schema).getRowDataToAvroConverter().convert(schema,
 record);
   }
 
+  @Override
+  public RowData getDeleteRow(RowData record, String recordKey) {
+    if (record != null) {
+      return record;
+    }
+    // don't need to emit record key row if primary key semantic is lost
+    if (recordKeyRowConverter == null) {
+      return null;
+    }
+    final String[] pkVals = KeyGenUtils.extractRecordKeys(recordKey);
+    RowData recordKeyRow = recordKeyRowConverter.convert(pkVals);
+    recordKeyRow.setRowKind(RowKind.DELETE);
+    return recordKeyRow;
+  }
+
   @Override
   public RowData convertAvroRecord(IndexedRecord avroRecord) {
     Schema recordSchema = avroRecord.getSchema();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 811f68d61ed..b4e5d14a9e0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -135,6 +135,7 @@ public class FormatUtils {
     }
   }
 
+  @Deprecated
   public static HoodieMergedLogRecordScanner logScanner(
       MergeOnReadInputSplit split,
       Schema logSchema,
@@ -163,6 +164,7 @@ public class FormatUtils {
         .build();
   }
 
+  @Deprecated
   public static HoodieMergedLogRecordScanner logScanner(
       List<String> logPaths,
       Schema logSchema,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index 2902aea8089..e68ebcc442b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -43,6 +44,7 @@ import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -81,7 +83,7 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
 
   @Override
   public ClosableIterator<HoodieRecord<RowData>> getRecordIterator(Schema 
readerSchema, Schema requestedSchema) throws IOException {
-    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema);
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema, Collections.emptyList());
     readerIterators.add(rowDataItr);
     return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
   }
@@ -89,12 +91,16 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
   @Override
   public ClosableIterator<String> getRecordKeyIterator() throws IOException {
     Schema schema = HoodieAvroUtils.getRecordKeySchema();
-    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema);
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema, 
Collections.emptyList());
     return new CloseableMappingIterator<>(rowDataItr, rowData -> 
Objects.toString(rowData.getString(0)));
   }
 
-  public ClosableIterator<RowData> getRowDataIterator(InternalSchemaManager 
internalSchemaManager, DataType dataType, Schema requestedSchema) throws 
IOException {
-    return RecordIterators.getParquetRecordIterator(storage.getConf(), 
internalSchemaManager, dataType, requestedSchema, path);
+  public ClosableIterator<RowData> getRowDataIterator(
+      InternalSchemaManager internalSchemaManager,
+      DataType dataType,
+      Schema requestedSchema,
+      List<Predicate> predicates) throws IOException {
+    return RecordIterators.getParquetRecordIterator(storage.getConf(), 
internalSchemaManager, dataType, requestedSchema, path, predicates);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index 8fd0f83197b..120e50e5388 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -43,7 +43,6 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.hadoop.util.SerializationUtil;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +63,8 @@ public abstract class RecordIterators {
       InternalSchemaManager internalSchemaManager,
       DataType dataType,
       Schema requestedSchema,
-      StoragePath path) throws IOException {
+      StoragePath path,
+      List<Predicate> predicates) throws IOException {
     List<String> fieldNames = ((RowType) 
dataType.getLogicalType()).getFieldNames();
     List<DataType> fieldTypes = dataType.getChildren();
     int[] selectedFields = 
requestedSchema.getFields().stream().map(Schema.Field::name)
@@ -87,7 +87,7 @@ public abstract class RecordIterators {
         new org.apache.flink.core.fs.Path(path.toUri()),
         0L,
         Long.MAX_VALUE,
-        Collections.emptyList());
+        predicates);
   }
 
   public static ClosableIterator<RowData> getParquetRecordIterator(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index e8b766b4c46..84c905364ca 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -135,7 +135,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
       return getBaseFileIteratorWithMetadata(split.getBasePath().get());
     } else if (!split.getBasePath().isPresent()) {
       // log files only
-      return new LogFileOnlyIterator(getFullLogFileIterator(split));
+      return getFullLogFileIterator(split);
     } else {
       Schema tableSchema = new 
Schema.Parser().parse(this.tableState.getAvroSchema());
       return new MergeIterator(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index dcd7a5f914e..ae35cee9616 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -34,6 +34,7 @@ import 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -42,10 +43,8 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.FlinkRowDataReaderContext;
@@ -58,11 +57,9 @@ import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.RowDataProjection;
 import org.apache.hudi.util.RowDataToAvroConverters;
 import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.util.StringToRowDataConverter;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
@@ -70,15 +67,12 @@ import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -91,7 +85,6 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
 
 /**
  * The base InputFormat class to read from Hoodie data + log files.
@@ -208,51 +201,24 @@ public class MergeOnReadInputFormat
   }
 
   protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit 
split) throws IOException {
-    if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
-      if (split.getInstantRange().isPresent()) {
-        // base file only with commit time filtering
-        return new BaseFileOnlyFilteringIterator(
-            split.getInstantRange().get(),
-            this.tableState.getRequiredRowType(),
-            this.requiredPos,
-            getBaseFileIterator(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
-      } else {
-        // base file only
-        return getBaseFileIterator(split.getBasePath().get());
-      }
-    } else if (!split.getBasePath().isPresent()) {
-      // log files only
+    String mergeType = split.getMergeType();
+    if (!split.getBasePath().isPresent()) {
       if (OptionsResolver.emitDeletes(conf)) {
-        return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+        mergeType = FlinkOptions.REALTIME_SKIP_MERGE;
       } else {
-        return new LogFileOnlyIterator(getLogFileIterator(split));
+        // always merge records in log files if there is no base file (aligned 
with legacy behaviour)
+        mergeType = FlinkOptions.REALTIME_PAYLOAD_COMBINE;
       }
-    } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
-      return new SkipMergeIterator(
-          getBaseFileIterator(split.getBasePath().get()),
-          getLogFileIterator(split));
-    } else if 
(split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
-      return new MergeIterator(
-          conf,
-          hadoopConf,
-          split,
-          this.tableState.getRowType(),
-          this.tableState.getRequiredRowType(),
-          new Schema.Parser().parse(this.tableState.getAvroSchema()),
-          new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
-          internalSchemaManager.getQuerySchema(),
-          this.requiredPos,
-          this.emitDelete,
-          this.tableState.getOperationPos(),
-          getBaseFileIteratorWithMetadata(split.getBasePath().get()));
-    } else {
-      throw new HoodieException("Unable to select an Iterator to read the 
Hoodie MOR File Split for "
-          + "file path: " + split.getBasePath()
-          + "log paths: " + split.getLogPaths()
-          + "hoodie table path: " + split.getTablePath()
-          + "flink partition Index: " + split.getSplitNumber()
-          + "merge type: " + split.getMergeType());
     }
+    ValidationUtils.checkArgument(
+        mergeType.equals(FlinkOptions.REALTIME_SKIP_MERGE) || 
mergeType.equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE),
+        "Unable to select an Iterator to read the Hoodie MOR File Split for "
+            + "file path: " + split.getBasePath()
+            + "log paths: " + split.getLogPaths()
+            + "hoodie table path: " + split.getTablePath()
+            + "flink partition Index: " + split.getSplitNumber()
+            + "merge type: " + split.getMergeType());
+    return getSplitIterator(split, mergeType);
   }
 
   @Override
@@ -363,93 +329,15 @@ public class MergeOnReadInputFormat
         predicates);
   }
 
-  private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit 
split) {
-    final Schema tableSchema = new 
Schema.Parser().parse(tableState.getAvroSchema());
-    final Schema requiredSchema = new 
Schema.Parser().parse(tableState.getRequiredAvroSchema());
-    final GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(requiredSchema);
-    final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
-        
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(), 
conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
-    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, 
tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf);
-    final Iterator<String> logRecordsKeyIterator = 
scanner.getRecords().keySet().iterator();
-    final int[] pkOffset = tableState.getPkOffsetsInRequired();
-    // flag saying whether the pk semantics has been dropped by user specified
-    // projections. For e.g, if the pk fields are [a, b] but user only select 
a,
-    // then the pk semantics is lost.
-    final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> 
offset == -1);
-    final LogicalType[] pkTypes = pkSemanticLost ? null : 
tableState.getPkTypes(pkOffset);
-    final StringToRowDataConverter converter = pkSemanticLost ? null : new 
StringToRowDataConverter(pkTypes);
-
-    return new ClosableIterator<RowData>() {
-      private RowData currentRecord;
-
-      @Override
-      public boolean hasNext() {
-        while (logRecordsKeyIterator.hasNext()) {
-          String curAvroKey = logRecordsKeyIterator.next();
-          Option<IndexedRecord> curAvroRecord = null;
-          final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord) 
scanner.getRecords().get(curAvroKey);
-          try {
-            curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
-          } catch (IOException e) {
-            throw new HoodieException("Get avro insert value error for key: " 
+ curAvroKey, e);
-          }
-          if (!curAvroRecord.isPresent()) {
-            // delete record found
-            if (emitDelete && !pkSemanticLost) {
-              GenericRowData delete = new 
GenericRowData(tableState.getRequiredRowType().getFieldCount());
-
-              final String recordKey = hoodieRecord.getRecordKey();
-              final String[] pkFields = 
KeyGenUtils.extractRecordKeys(recordKey);
-              final Object[] converted = converter.convert(pkFields);
-              for (int i = 0; i < pkOffset.length; i++) {
-                delete.setField(pkOffset[i], converted[i]);
-              }
-              delete.setRowKind(RowKind.DELETE);
-
-              this.currentRecord = delete;
-              return true;
-            }
-            // skipping if the condition is unsatisfied
-            // continue;
-          } else {
-            final IndexedRecord avroRecord = curAvroRecord.get();
-            final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, 
tableState.getOperationPos());
-            if (rowKind == RowKind.DELETE && !emitDelete) {
-              // skip the delete record
-              continue;
-            }
-            GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
-                avroRecord,
-                requiredSchema,
-                requiredPos,
-                recordBuilder);
-            currentRecord = (RowData) 
avroToRowDataConverter.convert(requiredAvroRecord);
-            currentRecord.setRowKind(rowKind);
-            return true;
-          }
-        }
-        return false;
-      }
-
-      @Override
-      public RowData next() {
-        return currentRecord;
-      }
-
-      @Override
-      public void close() {
-        scanner.close();
-      }
-    };
-  }
-
   /**
    * Get record iterator using {@link HoodieFileGroupReader}.
    *
    * @param split input split
-   * @return {@link RowData} iterator.
+   * @param mergeType merge type for FileGroup reader
+   *
+   * @return {@link RowData} iterator for the given split.
    */
-  private ClosableIterator<RowData> 
getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
+  private ClosableIterator<RowData> getSplitIterator(MergeOnReadInputSplit 
split, String mergeType) throws IOException {
     final Schema tableSchema = new 
Schema.Parser().parse(tableState.getAvroSchema());
     final Schema requiredSchema = new 
Schema.Parser().parse(tableState.getRequiredAvroSchema());
 
@@ -466,11 +354,12 @@ public class MergeOnReadInputFormat
             HadoopFSUtils.getStorageConf(hadoopConf),
             () -> internalSchemaManager,
             predicates,
-            metaClient.getTableConfig());
+            metaClient.getTableConfig(),
+            split.getInstantRange());
     TypedProperties typedProps = 
FlinkClientUtil.getMergedTableAndWriteProps(metaClient.getTableConfig(), 
writeConfig);
-    typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), 
HoodieReaderConfig.REALTIME_SKIP_MERGE);
+    typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
 
-    try (HoodieFileGroupReader<RowData> fileGroupReader = 
HoodieFileGroupReader.<RowData>newBuilder()
+    HoodieFileGroupReader<RowData> fileGroupReader = 
HoodieFileGroupReader.<RowData>newBuilder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metaClient)
         .withLatestCommitTime(split.getLatestCommit())
@@ -480,11 +369,9 @@ public class MergeOnReadInputFormat
         
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
         .withProps(typedProps)
         .withShouldUseRecordPosition(false)
-        .build()) {
-      return fileGroupReader.getClosableIterator();
-    } catch (IOException e) {
-      throw new HoodieUpsertException("Failed to compact file slice: " + 
fileSlice, e);
-    }
+        .withEmitDelete(emitDelete)
+        .build();
+    return fileGroupReader.getClosableIterator();
   }
 
   protected static Option<IndexedRecord> getInsertVal(HoodieAvroRecord<?> 
hoodieRecord, Schema tableSchema) {
@@ -551,130 +438,6 @@ public class MergeOnReadInputFormat
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
-  /**
-   * Base record iterator with instant time filtering.
-   */
-  static class BaseFileOnlyFilteringIterator implements 
ClosableIterator<RowData> {
-    // base file record iterator
-    private final ClosableIterator<RowData> nested;
-    private final InstantRange instantRange;
-    private final RowDataProjection projection;
-
-    private RowData currentRecord;
-
-    private int commitTimePos;
-
-    BaseFileOnlyFilteringIterator(
-        InstantRange instantRange,
-        RowType requiredRowType,
-        int[] requiredPos,
-        ClosableIterator<RowData> nested) {
-      this.nested = nested;
-      this.instantRange = instantRange;
-      this.commitTimePos = getCommitTimePos(requiredPos);
-      int[] positions;
-      if (commitTimePos < 0) {
-        commitTimePos = 0;
-        positions = IntStream.range(1, 1 + requiredPos.length).toArray();
-      } else {
-        positions = IntStream.range(0, requiredPos.length).toArray();
-      }
-      this.projection = RowDataProjection.instance(requiredRowType, positions);
-    }
-
-    @Override
-    public boolean hasNext() {
-      while (this.nested.hasNext()) {
-        currentRecord = this.nested.next();
-        boolean isInRange = 
instantRange.isInRange(currentRecord.getString(commitTimePos).toString());
-        if (isInRange) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      // can promote: no need to project with null instant range
-      return projection.project(currentRecord);
-    }
-
-    @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
-      }
-    }
-  }
-
-  protected static class LogFileOnlyIterator implements 
ClosableIterator<RowData> {
-    // iterator for log files
-    private final ClosableIterator<RowData> iterator;
-
-    public LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
-      this.iterator = iterator;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return this.iterator.hasNext();
-    }
-
-    @Override
-    public RowData next() {
-      return this.iterator.next();
-    }
-
-    @Override
-    public void close() {
-      if (this.iterator != null) {
-        this.iterator.close();
-      }
-    }
-  }
-
-  static class SkipMergeIterator implements ClosableIterator<RowData> {
-    // base file record iterator
-    private final ClosableIterator<RowData> nested;
-    // iterator for log files
-    private final ClosableIterator<RowData> iterator;
-
-    private RowData currentRecord;
-
-    SkipMergeIterator(ClosableIterator<RowData> nested, 
ClosableIterator<RowData> iterator) {
-      this.nested = nested;
-      this.iterator = iterator;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (this.nested.hasNext()) {
-        currentRecord = this.nested.next();
-        return true;
-      }
-      if (this.iterator.hasNext()) {
-        currentRecord = this.iterator.next();
-        return true;
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      return currentRecord;
-    }
-
-    @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
-      }
-      if (this.iterator != null) {
-        this.iterator.close();
-      }
-    }
-  }
 
   protected static class MergeIterator implements ClosableIterator<RowData> {
     // base file record iterator
@@ -703,26 +466,6 @@ public class MergeOnReadInputFormat
 
     private RowData currentRecord;
 
-    public MergeIterator(
-        Configuration flinkConf,
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        MergeOnReadInputSplit split,
-        RowType tableRowType,
-        RowType requiredRowType,
-        Schema tableSchema,
-        Schema requiredSchema,
-        InternalSchema querySchema,
-        int[] requiredPos,
-        boolean emitDelete,
-        int operationPos,
-        ClosableIterator<RowData> nested) { // the iterator should be with 
full schema
-      this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, 
tableSchema,
-          querySchema,
-          Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
-          Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, 
requiredPos, new GenericRecordBuilder(requiredSchema))),
-          emitDelete, operationPos, nested);
-    }
-
     public MergeIterator(
         Configuration flinkConf,
         org.apache.hadoop.conf.Configuration hadoopConf,
@@ -905,25 +648,6 @@ public class MergeOnReadInputFormat
   //  Utilities
   // -------------------------------------------------------------------------
 
-  private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
-    if (getCommitTimePos(requiredPos) >= 0) {
-      return requiredPos;
-    }
-    int[] requiredPos2 = new int[requiredPos.length + 1];
-    requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
-    System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
-    return requiredPos2;
-  }
-
-  private static int getCommitTimePos(int[] requiredPos) {
-    for (int i = 0; i < requiredPos.length; i++) {
-      if (requiredPos[i] == HOODIE_COMMIT_TIME_COL_POS) {
-        return i;
-      }
-    }
-    return -1;
-  }
-
   @VisibleForTesting
   public void isEmitDelete(boolean emitDelete) {
     this.emitDelete = emitDelete;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 36dfecbb79a..7f55b38b612 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -20,11 +20,9 @@ package org.apache.hudi.table.format.mor;
 
 import org.apache.hudi.common.model.HoodieRecord;
 
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -89,29 +87,4 @@ public class MergeOnReadTableState implements Serializable {
         .mapToInt(i -> i)
         .toArray();
   }
-
-  /**
-   * Get the primary key positions in required row type.
-   */
-  public int[] getPkOffsetsInRequired() {
-    final List<String> fieldNames = requiredRowType.getFieldNames();
-    return Arrays.stream(pkFields)
-        .map(fieldNames::indexOf)
-        .mapToInt(i -> i)
-        .toArray();
-  }
-
-  /**
-   * Returns the primary key fields logical type with given offsets.
-   *
-   * @param pkOffsets the pk offsets in required row type
-   * @return pk field logical types
-   * @see #getPkOffsetsInRequired()
-   */
-  public LogicalType[] getPkTypes(int[] pkOffsets) {
-    final LogicalType[] requiredTypes = requiredRowType.getFields().stream()
-        .map(RowType.RowField::getType).toArray(LogicalType[]::new);
-    return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset])
-        .toArray(LogicalType[]::new);
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
index 6c4aae3cd13..b9f1baf6482 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
@@ -22,10 +22,13 @@ import org.apache.hudi.common.util.ValidationUtils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 
 import java.math.BigDecimal;
@@ -44,22 +47,26 @@ import static 
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 @Internal
 public class StringToRowDataConverter {
   private final Converter[] converters;
+  private final int[] fieldsPos;
+  private final int rowArity;
 
-  public StringToRowDataConverter(LogicalType[] fieldTypes) {
-    this.converters = Arrays.stream(fieldTypes)
-        .map(StringToRowDataConverter::getConverter)
+  public StringToRowDataConverter(int[] fieldsPos, RowType rowType) {
+    this.fieldsPos = fieldsPos;
+    this.rowArity = rowType.getFieldCount();
+    this.converters = Arrays.stream(fieldsPos)
+        .mapToObj(f -> getConverter(rowType.getTypeAt(f)))
         .toArray(Converter[]::new);
   }
 
-  public Object[] convert(String[] fields) {
+  public RowData convert(String[] fields) {
     ValidationUtils.checkArgument(converters.length == fields.length,
         "Field types and values should equal with number");
 
-    Object[] converted = new Object[fields.length];
+    GenericRowData rowData = new GenericRowData(rowArity);
     for (int i = 0; i < fields.length; i++) {
-      converted[i] = converters[i].convert(fields[i]);
+      rowData.setField(fieldsPos[i], converters[i].convert(fields[i]));
     }
-    return converted;
+    return rowData;
   }
 
   private interface Converter {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 0d3fde90155..e915581b1a4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -110,7 +110,8 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
         storageConf,
         () -> InternalSchemaManager.DISABLED,
         Collections.emptyList(),
-        metaClient.getTableConfig());
+        metaClient.getTableConfig(),
+        Option.empty());
   }
 
   @Override
@@ -164,7 +165,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
     when(tableConfig.populateMetaFields()).thenReturn(true);
     FlinkRowDataReaderContext readerContext =
-        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, 
Option.empty());
     Schema schema = SchemaBuilder.builder()
         .record("test")
         .fields()
@@ -182,7 +183,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
     when(tableConfig.populateMetaFields()).thenReturn(true);
     FlinkRowDataReaderContext readerContext =
-        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, 
Option.empty());
     Schema schema = SchemaBuilder.builder()
         .record("test")
         .fields()
@@ -200,7 +201,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     when(tableConfig.populateMetaFields()).thenReturn(false);
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"field1"}));
     FlinkRowDataReaderContext readerContext =
-        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, 
Option.empty());
     Schema schema = SchemaBuilder.builder()
         .record("test")
         .fields()
@@ -218,7 +219,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     when(tableConfig.populateMetaFields()).thenReturn(false);
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"field1", "field2"}));
     FlinkRowDataReaderContext readerContext =
-        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+        new FlinkRowDataReaderContext(getStorageConf(), () -> 
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, 
Option.empty());
 
     Schema schema = SchemaBuilder.builder()
         .record("test")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index bbf7e0415bf..e5a497ee8ab 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -304,6 +304,31 @@ public class TestInputFormat {
     assertThat(actual2, is(expected));
   }
 
+  @Test
+  void testReadWithDeletes() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write another commit to read again
+    TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+    ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+    List<RowData> result = readData(inputFormat);
+
+    final String actual = TestData.rowDataToString(result);
+    final String expected = "["
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "-D[id3, null, null, null, null], "
+        + "-D[id5, null, null, null, null], "
+        + "-D[id9, null, null, null, null]]";
+    assertThat(actual, is(expected));
+  }
+
   @Test
   void testReadWithDeletesMOR() throws Exception {
     Map<String, String> options = new HashMap<>();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 1c671e371d9..ad8d2d4080c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -982,6 +982,7 @@ public class TestData {
   /**
    * Returns the scanner to read avro log files.
    */
+  @Deprecated
   private static HoodieMergedLogRecordScanner getScanner(
       HoodieStorage storage,
       String basePath,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
index 8f7ecad1384..b5f140b72e2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
@@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -41,7 +42,7 @@ import java.time.temporal.ChronoField;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Test cases for {@link StringToRowDataConverter}.
@@ -59,8 +60,9 @@ public class TestStringToRowDataConverter {
         DataTypes.TIMESTAMP(6).getLogicalType(),
         DataTypes.DECIMAL(7, 2).getLogicalType()
     };
-    StringToRowDataConverter converter = new 
StringToRowDataConverter(fieldTypes);
-    Object[] converted = converter.convert(fields);
+    RowType rowType = RowType.of(fieldTypes);
+    StringToRowDataConverter converter = new StringToRowDataConverter(new 
int[]{0, 1, 2, 3, 4, 5, 6}, rowType);
+    RowData actual = converter.convert(fields);
     Object[] expected = new Object[] {
         1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(),
         LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY),
@@ -68,7 +70,8 @@ public class TestStringToRowDataConverter {
         
TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29.666111Z")),
         DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)
     };
-    assertArrayEquals(expected, converted);
+    GenericRowData expectedRow = GenericRowData.of(expected);
+    assertEquals(expectedRow, actual);
   }
 
   @Test
@@ -97,15 +100,10 @@ public class TestStringToRowDataConverter {
     GenericRecord avroRecord =
         (GenericRecord) 
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
     StringToRowDataConverter stringToRowDataConverter =
-        new StringToRowDataConverter(rowType.getChildren().toArray(new 
LogicalType[0]));
+        new StringToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6}, rowType);
     final String recordKey = KeyGenUtils.getRecordKey(avroRecord, 
rowType.getFieldNames(), false);
     final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey);
-    Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys);
-
-    GenericRowData converted = new GenericRowData(7);
-    for (int i = 0; i < 7; i++) {
-      converted.setField(i, convertedKeys[i]);
-    }
+    RowData converted = stringToRowDataConverter.convert(recordKeys);
     assertThat(converted, is(rowData));
   }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index fa051360b3f..8941d6bed34 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -177,6 +177,11 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
     return objectInspectorCache.serialize(record, schema);
   }
 
+  @Override
+  public ArrayWritable getDeleteRow(ArrayWritable record, String recordKey) {
+    throw new UnsupportedOperationException("Not supported for " + 
this.getClass().getSimpleName());
+  }
+
   @Override
   public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, 
String mergeStrategyId, String mergeImplClasses) {
     // TODO(HUDI-7843):
@@ -202,6 +207,11 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
     return StringUtils.isNullOrEmpty(fieldName) ? null : 
objectInspectorCache.getValue(record, schema, fieldName);
   }
 
+  @Override
+  public String getMetaFieldValue(ArrayWritable record, int pos) {
+    return record.get()[pos].toString();
+  }
+
   @Override
   public boolean castToBoolean(Object value) {
     if (value instanceof BooleanWritable) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 47e7f42d12d..4211b24f16c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -138,7 +138,8 @@ public class TestPositionBasedFileGroupRecordBuffer extends 
TestHoodieFileGroupR
         baseFileInstantTime,
         props,
         readStats,
-        Option.of("timestamp"));
+        Option.of("timestamp"),
+        false);
   }
 
   public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean 
shouldWriteRecordPositions,

Reply via email to