This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 33dd7afcc7a [HUDI-9022] Handle records with custom delete markers in 
FG reader (#12843)
33dd7afcc7a is described below

commit 33dd7afcc7a23c68b7ce4bdfaeab6ec596d0c742
Author: Lin Liu <[email protected]>
AuthorDate: Tue Mar 25 10:27:50 2025 -0700

    [HUDI-9022] Handle records with custom delete markers in FG reader (#12843)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../SparkFileFormatInternalRowReaderContext.scala  |   2 +-
 .../SparkClientFunctionalTestHarness.java          |  11 +-
 .../hudi/common/engine/HoodieReaderContext.java    |  21 +-
 .../table/log/BaseHoodieLogRecordReader.java       |   6 +-
 .../table/log/HoodieMergedLogRecordReader.java     |   8 +-
 ...dler.java => FileGroupReaderSchemaHandler.java} | 123 ++++++--
 ...ecordBuffer.java => FileGroupRecordBuffer.java} |  58 +++-
 .../common/table/read/HoodieFileGroupReader.java   |  34 +--
 ...fer.java => KeyBasedFileGroupRecordBuffer.java} |  42 ++-
 ...ava => PositionBasedFileGroupRecordBuffer.java} |  44 ++-
 ...andler.java => PositionBasedSchemaHandler.java} |  18 +-
 ...fer.java => UnmergedFileGroupRecordBuffer.java} |   4 +-
 .../table/read/TestFileGroupRecordBuffer.java      | 328 +++++++++++++++++++++
 .../table/read/TestHoodieFileGroupReaderBase.java  |   2 +-
 .../read/TestHoodieFileGroupRecordBuffer.java      | 141 ---------
 .../hudi/hadoop/HiveHoodieReaderContext.java       |  10 +
 ...stSparkFileFormatInternalRowReaderContext.scala |   2 +-
 ...=> TestPositionBasedFileGroupRecordBuffer.java} |  12 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    | 145 ++++++++-
 19 files changed, 747 insertions(+), 264 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index fb1913bf80e..7eb154ddd0a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.isNullable
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
-import 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import 
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.collection.{CachingIterator, 
ClosableIterator}
 import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, 
HoodieSparkParquetReader}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index aba8573073c..97c79534787 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -447,7 +447,9 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
    * @param validateColumns columns to validate
    * @return true if dataframes are equal, false otherwise
    */
-  public static boolean areDataframesEqual(Dataset<Row> expectedDf, 
Dataset<Row> actualDf, Set<String> validateColumns) {
+  public static boolean areDataframesEqual(Dataset<Row> expectedDf,
+                                           Dataset<Row> actualDf,
+                                           Set<String> validateColumns) {
     // Normalize schema order
     String[] sortedColumnNames = Arrays.stream(expectedDf.columns())
         .filter(validateColumns::contains).sorted().toArray(String[]::new);
@@ -456,11 +458,8 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
     Dataset<Row> df1Normalized = expectedDf.selectExpr(sortedColumnNames);
     Dataset<Row> df2Normalized = actualDf.selectExpr(sortedColumnNames);
 
-    // Sort rows
-    Dataset<Row> df1Sorted = df1Normalized.sort("_row_key");
-    Dataset<Row> df2Sorted = df2Normalized.sort("_row_key");
-
     // Check for differences
-    return df1Sorted.except(df2Sorted).isEmpty() && 
df2Sorted.except(df1Sorted).isEmpty();
+    return df1Normalized.except(df2Normalized).isEmpty()
+        && df2Normalized.except(df1Normalized).isEmpty();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index d2103513f8d..2751998b111 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -22,7 +22,7 @@ package org.apache.hudi.common.engine;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -59,7 +59,7 @@ import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIEL
  */
 public abstract class HoodieReaderContext<T> implements Closeable {
 
-  private HoodieFileGroupReaderSchemaHandler<T> schemaHandler = null;
+  private FileGroupReaderSchemaHandler<T> schemaHandler = null;
   private String tablePath = null;
   private String latestCommitTime = null;
   private Option<HoodieRecordMerger> recordMerger = null;
@@ -72,11 +72,11 @@ public abstract class HoodieReaderContext<T> implements 
Closeable {
   private final LocalAvroSchemaCache localAvroSchemaCache = 
LocalAvroSchemaCache.getInstance();
 
   // Getter and Setter for schemaHandler
-  public HoodieFileGroupReaderSchemaHandler<T> getSchemaHandler() {
+  public FileGroupReaderSchemaHandler<T> getSchemaHandler() {
     return schemaHandler;
   }
 
-  public void setSchemaHandler(HoodieFileGroupReaderSchemaHandler<T> 
schemaHandler) {
+  public void setSchemaHandler(FileGroupReaderSchemaHandler<T> schemaHandler) {
     this.schemaHandler = schemaHandler;
   }
 
@@ -215,6 +215,19 @@ public abstract class HoodieReaderContext<T> implements 
Closeable {
    */
   public abstract Object getValue(T record, Schema schema, String fieldName);
 
+  /**
+   * Cast to Java boolean value.
+   * If the object is not compatible with boolean type, throws.
+   */
+  public boolean castToBoolean(Object value) {
+    if (value instanceof Boolean) {
+      return (boolean) value;
+    } else {
+      throw new IllegalArgumentException(
+          "Input value type " + value.getClass() + ", cannot be cast to 
boolean");
+    }
+  }
+
   /**
    * Gets the record key in String.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 346472411bb..b74a0c9b92f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -30,7 +30,7 @@ import 
org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -135,7 +135,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
   private final List<String> validBlockInstants = new ArrayList<>();
   // Use scanV2 method.
   private final boolean enableOptimizedLogBlocksScan;
-  protected HoodieFileGroupRecordBuffer<T> recordBuffer;
+  protected FileGroupRecordBuffer<T> recordBuffer;
 
   protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
                                       HoodieStorage storage,
@@ -145,7 +145,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
                                       Option<String> partitionNameOverride,
                                       Option<String> keyFieldOverride,
                                       boolean enableOptimizedLogBlocksScan,
-                                      HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
+                                      FileGroupRecordBuffer<T> recordBuffer) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
     this.latestInstantTime = readerContext.getLatestCommitTime();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 6c4dbf936f3..cffe48b6a24 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -21,7 +21,7 @@ package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -70,7 +70,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
                                       Option<String> partitionName,
                                       Option<String> keyFieldOverride,
                                       boolean enableOptimizedLogBlocksScan,
-                                      HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
+                                      FileGroupRecordBuffer<T> recordBuffer) {
     super(readerContext, storage, logFilePaths, reverseReader, bufferSize, 
instantRange, withOperationField,
         forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordBuffer);
     this.scannedPrefixes = new HashSet<>();
@@ -219,7 +219,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     private boolean forceFullScan = true;
     private boolean enableOptimizedLogBlocksScan = false;
 
-    private HoodieFileGroupRecordBuffer<T> recordBuffer;
+    private FileGroupRecordBuffer<T> recordBuffer;
 
     @Override
     public Builder<T> withHoodieReaderContext(HoodieReaderContext<T> 
readerContext) {
@@ -287,7 +287,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
       return this;
     }
 
-    public Builder<T> withRecordBuffer(HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
+    public Builder<T> withRecordBuffer(FileGroupRecordBuffer<T> recordBuffer) {
       this.recordBuffer = recordBuffer;
       return this;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
similarity index 65%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 15f7b161518..a73398f2240 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -26,8 +26,8 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
@@ -41,9 +41,11 @@ import org.apache.avro.Schema;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -51,13 +53,16 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
 import static 
org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference;
 import static org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
 
 /**
  * This class is responsible for handling the schema for the file group reader.
  */
-public class HoodieFileGroupReaderSchemaHandler<T> {
+public class FileGroupReaderSchemaHandler<T> {
 
-  protected final Schema dataSchema;
+  protected final Schema tableSchema;
 
   // requestedSchema: the schema that the caller requests
   protected final Schema requestedSchema;
@@ -84,20 +89,26 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
 
   private final LocalAvroSchemaCache localAvroSchemaCache;
 
-  public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext<T> 
readerContext,
-                                            Schema dataSchema,
-                                            Schema requestedSchema,
-                                            Option<InternalSchema> 
internalSchemaOpt,
-                                            HoodieTableConfig 
hoodieTableConfig,
-                                            TypedProperties properties) {
+  private final Option<Pair<String, String>> customDeleteMarkerKeyValue;
+  private final boolean hasBuiltInDelete;
+
+  public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
+                                      Schema tableSchema,
+                                      Schema requestedSchema,
+                                      Option<InternalSchema> internalSchemaOpt,
+                                      HoodieTableConfig hoodieTableConfig,
+                                      TypedProperties properties) {
     this.properties = properties;
     this.readerContext = readerContext;
     this.hasBootstrapBaseFile = readerContext.getHasBootstrapBaseFile();
     this.needsMORMerge = readerContext.getHasLogFiles();
     this.recordMerger = readerContext.getRecordMerger();
-    this.dataSchema = dataSchema;
+    this.tableSchema = tableSchema;
     this.requestedSchema = AvroSchemaCache.intern(requestedSchema);
     this.hoodieTableConfig = hoodieTableConfig;
+    Pair<Option<Pair<String, String>>, Boolean> deleteConfigs = 
getDeleteConfigs(properties, tableSchema);
+    this.customDeleteMarkerKeyValue = deleteConfigs.getLeft();
+    this.hasBuiltInDelete = deleteConfigs.getRight();
     this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema());
     this.internalSchema = pruneInternalSchema(requiredSchema, 
internalSchemaOpt);
     this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
@@ -105,8 +116,8 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     this.localAvroSchemaCache = LocalAvroSchemaCache.getInstance();
   }
 
-  public Schema getDataSchema() {
-    return this.dataSchema;
+  public Schema getTableSchema() {
+    return this.tableSchema;
   }
 
   public Schema getRequestedSchema() {
@@ -132,6 +143,14 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     return Option.empty();
   }
 
+  public Option<Pair<String, String>> getCustomDeleteMarkerKeyValue() {
+    return customDeleteMarkerKeyValue;
+  }
+
+  public boolean hasBuiltInDelete() {
+    return hasBuiltInDelete;
+  }
+
   private InternalSchema pruneInternalSchema(Schema requiredSchema, 
Option<InternalSchema> internalSchemaOption) {
     if (!internalSchemaOption.isPresent()) {
       return InternalSchema.getEmptyInternalSchema();
@@ -161,14 +180,16 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
 
     if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) {
       if (!recordMerger.get().isProjectionCompatible()) {
-        return dataSchema;
+        return tableSchema;
       }
     }
 
     List<Schema.Field> addedFields = new ArrayList<>();
-    for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, 
properties, dataSchema, recordMerger)) {
+    for (String field : getMandatoryFieldsForMerging(
+        hoodieTableConfig, properties, tableSchema, recordMerger,
+        hasBuiltInDelete, customDeleteMarkerKeyValue)) {
       if (!findNestedField(requestedSchema, field).isPresent()) {
-        Option<Schema.Field> foundFieldOpt  = findNestedField(dataSchema, 
field);
+        Option<Schema.Field> foundFieldOpt = findNestedField(tableSchema, 
field);
         if (!foundFieldOpt.isPresent()) {
           throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
         }
@@ -184,8 +205,12 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
   }
 
-  private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg, 
TypedProperties props,
-                                               Schema dataSchema, 
Option<HoodieRecordMerger> recordMerger) {
+  private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg,
+                                                       TypedProperties props,
+                                                       Schema tableSchema,
+                                                       
Option<HoodieRecordMerger> recordMerger,
+                                                       boolean 
hasBuiltInDelete,
+                                                       Option<Pair<String, 
String>> customDeleteMarkerKeyAndValue) {
     Triple<RecordMergeMode, String, String> mergingConfigs = 
HoodieTableConfig.inferCorrectMergingBehavior(
         cfg.getRecordMergeMode(),
         cfg.getPayloadClass(),
@@ -194,11 +219,12 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
         HoodieTableVersion.current());
 
     if (mergingConfigs.getLeft() == RecordMergeMode.CUSTOM) {
-      return recordMerger.get().getMandatoryFieldsForMerging(dataSchema, cfg, 
props);
+      return recordMerger.get().getMandatoryFieldsForMerging(tableSchema, cfg, 
props);
     }
 
-    ArrayList<String> requiredFields = new ArrayList<>();
-
+    // Use Set to avoid duplicated fields.
+    Set<String> requiredFields = new HashSet<>();
+    // Add record key fields.
     if (cfg.populateMetaFields()) {
       requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
     } else {
@@ -207,17 +233,22 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
         requiredFields.addAll(Arrays.asList(fields.get()));
       }
     }
-
+    // Add precombine field for event time ordering merge mode.
     if (mergingConfigs.getLeft() == RecordMergeMode.EVENT_TIME_ORDERING) {
       String preCombine = cfg.getPreCombineField();
       if (!StringUtils.isNullOrEmpty(preCombine)) {
         requiredFields.add(preCombine);
       }
     }
-
-    if (dataSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) != null) {
+    // Add `HOODIE_IS_DELETED_FIELD` field if exists.
+    if (hasBuiltInDelete) {
       requiredFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
     }
+    // Add custom delete key field if exists.
+    if (customDeleteMarkerKeyAndValue.isPresent()) {
+      requiredFields.add(customDeleteMarkerKeyAndValue.get().getLeft());
+    }
+
     return requiredFields.toArray(new String[0]);
   }
 
@@ -235,14 +266,14 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
   }
 
   public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapDataFields() {
-    return getDataAndMetaCols(dataSchema);
+    return getDataAndMetaCols(tableSchema);
   }
 
   @VisibleForTesting
   static Pair<List<Schema.Field>, List<Schema.Field>> 
getDataAndMetaCols(Schema schema) {
     Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
         //if there are no data fields, then we don't want to think the temp 
col is a data col
-        .filter(f -> !Objects.equals(f.name(), 
HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+        .filter(f -> !Objects.equals(f.name(), 
PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME))
         .collect(Collectors.partitioningBy(f -> 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
     return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
         fieldsByMeta.getOrDefault(false, Collections.emptyList()));
@@ -254,6 +285,46 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
       Schema.Field curr = fields.get(i);
       fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(), 
curr.defaultVal()));
     }
-    return createNewSchemaFromFieldsWithReference(dataSchema, fields);
+    return createNewSchemaFromFieldsWithReference(tableSchema, fields);
+  }
+
+  /**
+   * Fetches the delete configs from the configs.
+   *
+   * @param props write and table configs that contain delete related 
properties
+   * @param tableSchema table schema
+   * @return a pair of custom delete marker key, value, and whether built-in 
delete marker
+   * (`_hoodie_is_deleted`) is included.
+   */
+  private static Pair<Option<Pair<String, String>>, Boolean> 
getDeleteConfigs(TypedProperties props,
+                                                                              
Schema tableSchema) {
+    String deleteKey = props.getProperty(DELETE_KEY);
+    String deleteMarker = props.getProperty(DELETE_MARKER);
+    boolean deleteKeyExists = !StringUtils.isNullOrEmpty(deleteKey);
+    boolean deleteMarkerExists = !StringUtils.isNullOrEmpty(deleteMarker);
+
+    Option<Pair<String, String>> customDeleteMarkerKeyAndValue;
+    // DELETE_KEY and DELETE_MARKER both should be set.
+    if (deleteKeyExists && deleteMarkerExists) {
+      // DELETE_KEY field exists in the schema.
+      customDeleteMarkerKeyAndValue = Option.of(Pair.of(deleteKey, 
deleteMarker));
+    } else if (!deleteKeyExists && !deleteMarkerExists) {
+      // Normal case.
+      customDeleteMarkerKeyAndValue = Option.empty();
+    } else {
+      throw new IllegalArgumentException("Either custom delete key or marker 
is not specified");
+    }
+    return Pair.of(customDeleteMarkerKeyAndValue, 
hasBuiltInDeleteField(tableSchema));
+  }
+
+  /**
+   * Check if "_hoodie_is_deleted" field (built-in deletes) exists in the 
schema.
+   * Assume the type of this column is boolean.
+   *
+   * @param schema table schema to check
+   * @return whether built-in delete field is included in the table schema
+   */
+  private static boolean hasBuiltInDeleteField(Schema schema) {
+    return schema.getField(HOODIE_IS_DELETED_FIELD) != null;
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
similarity index 93%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 528c2083136..38ac956e07b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -79,7 +79,7 @@ import static 
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 
-public abstract class HoodieBaseFileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T> {
+public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T> {
   protected final HoodieReaderContext<T> readerContext;
   protected final Schema readerSchema;
   protected final Option<String> orderingFieldName;
@@ -91,6 +91,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected final TypedProperties props;
   protected final ExternalSpillableMap<Serializable, Pair<Option<T>, 
Map<String, Object>>> records;
   protected final HoodieReadStats readStats;
+  protected final boolean shouldCheckCustomDeleteMarker;
+  protected final boolean shouldCheckBuiltInDeleteMarker;
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
   protected T nextRecord;
@@ -98,13 +100,13 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected InternalSchema internalSchema;
   protected HoodieTableMetaClient hoodieTableMetaClient;
 
-  public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
-                                         HoodieTableMetaClient 
hoodieTableMetaClient,
-                                         RecordMergeMode recordMergeMode,
-                                         Option<String> 
partitionNameOverrideOpt,
-                                         Option<String[]> 
partitionPathFieldOpt,
-                                         TypedProperties props,
-                                         HoodieReadStats readStats) {
+  protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
+                                  HoodieTableMetaClient hoodieTableMetaClient,
+                                  RecordMergeMode recordMergeMode,
+                                  Option<String> partitionNameOverrideOpt,
+                                  Option<String[]> partitionPathFieldOpt,
+                                  TypedProperties props,
+                                  HoodieReadStats readStats) {
     this.readerContext = readerContext;
     this.readerSchema = 
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
     this.partitionNameOverrideOpt = partitionNameOverrideOpt;
@@ -143,6 +145,40 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     } catch (IOException e) {
       throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
     }
+    this.shouldCheckCustomDeleteMarker =
+        
readerContext.getSchemaHandler().getCustomDeleteMarkerKeyValue().isPresent();
+    this.shouldCheckBuiltInDeleteMarker =
+        readerContext.getSchemaHandler().hasBuiltInDelete();
+  }
+
+  /**
+   * Here we assume that delete marker column type is of string.
+   * This should be sufficient for most cases.
+   */
+  protected final boolean isCustomDeleteRecord(T record) {
+    if (!shouldCheckCustomDeleteMarker) {
+      return false;
+    }
+
+    Pair<String, String> markerKeyValue =
+        readerContext.getSchemaHandler().getCustomDeleteMarkerKeyValue().get();
+    Object deleteMarkerValue =
+        readerContext.getValue(record, readerSchema, markerKeyValue.getLeft());
+    return deleteMarkerValue != null
+        && markerKeyValue.getRight().equals(deleteMarkerValue.toString());
+  }
+
+  /**
+   * Check if the value of column "_hoodie_is_deleted" is true.
+   */
+  protected final boolean isBuiltInDeleteRecord(T record) {
+    if (!shouldCheckBuiltInDeleteMarker) {
+      return false;
+    }
+
+    Object columnValue = readerContext.getValue(
+        record, readerSchema, HOODIE_IS_DELETED_FIELD);
+    return columnValue != null && readerContext.castToBoolean(columnValue);
   }
 
   @Override
@@ -244,8 +280,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             Comparable incomingOrderingValue = readerContext.getOrderingValue(
                 Option.of(record), metadata, readerSchema, orderingFieldName);
             if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
-              return Option.of(Pair.of(isDeleteRecord(Option.of(record), 
readerContext.getSchemaFromMetadata(metadata))
-                  ? Option.empty() : Option.of(record), metadata));
+              return Option.of(Pair.of(Option.of(record), metadata));
             }
             return Option.empty();
           case CUSTOM:
@@ -263,7 +298,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                   return 
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
                 }
               }
-
               return Option.empty();
             } else {
               Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().merge(
@@ -285,7 +319,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
               if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
                 return 
Option.of(Pair.of(Option.ofNullable(combinedRecord.getData()), metadata));
               }
-
               return Option.empty();
             }
         }
@@ -445,7 +478,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
               }
               return 
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
             }
-
             return Option.empty();
           } else {
             Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 3d6c23182c7..94d63aac81b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -78,7 +78,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   // Length of bytes to read from the base file
   private final long length;
   // Core structure to store and process records.
-  private final HoodieFileGroupRecordBuffer<T> recordBuffer;
+  private final FileGroupRecordBuffer<T> recordBuffer;
   private ClosableIterator<T> baseFileIterator;
   private final Option<UnaryOperator<T>> outputConverter;
   private final HoodieReadStats readStats;
@@ -127,8 +127,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     }
     readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && 
hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent());
     readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex()
-        ? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props)
-        : new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props));
+        ? new PositionBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props)
+        : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props));
     this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
     this.readStats = new HoodieReadStats();
     this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
@@ -139,26 +139,26 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   /**
    * Initialize correct record buffer
    */
-  private static HoodieFileGroupRecordBuffer 
getRecordBuffer(HoodieReaderContext readerContext,
-                                                             
HoodieTableMetaClient hoodieTableMetaClient,
-                                                             RecordMergeMode 
recordMergeMode,
-                                                             TypedProperties 
props,
-                                                             
Option<HoodieBaseFile> baseFileOption,
-                                                             boolean 
hasNoLogFiles,
-                                                             boolean 
isSkipMerge,
-                                                             boolean 
shouldUseRecordPosition,
-                                                             HoodieReadStats 
readStats) {
+  private static FileGroupRecordBuffer getRecordBuffer(HoodieReaderContext 
readerContext,
+                                                       HoodieTableMetaClient 
hoodieTableMetaClient,
+                                                       RecordMergeMode 
recordMergeMode,
+                                                       TypedProperties props,
+                                                       Option<HoodieBaseFile> 
baseFileOption,
+                                                       boolean hasNoLogFiles,
+                                                       boolean isSkipMerge,
+                                                       boolean 
shouldUseRecordPosition,
+                                                       HoodieReadStats 
readStats) {
     if (hasNoLogFiles) {
       return null;
     } else if (isSkipMerge) {
-      return new HoodieUnmergedFileGroupRecordBuffer<>(
+      return new UnmergedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
     } else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
-      return new HoodiePositionBasedFileGroupRecordBuffer<>(
+      return new PositionBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(),
           Option.empty(), baseFileOption.get().getCommitTime(), props, 
readStats);
     } else {
-      return new HoodieKeyBasedFileGroupRecordBuffer<>(
+      return new KeyBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
     }
   }
@@ -191,12 +191,12 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     if (baseFileStoragePathInfo != null) {
       return readerContext.getFileRecordIterator(
           baseFileStoragePathInfo, start, length,
-          readerContext.getSchemaHandler().getDataSchema(),
+          readerContext.getSchemaHandler().getTableSchema(),
           readerContext.getSchemaHandler().getRequiredSchema(), storage);
     } else {
       return readerContext.getFileRecordIterator(
           baseFile.getStoragePath(), start, length,
-          readerContext.getSchemaHandler().getDataSchema(),
+          readerContext.getSchemaHandler().getTableSchema(),
           readerContext.getSchemaHandler().getRequiredSchema(), storage);
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
similarity index 78%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index 144e1c764cf..d19519a3f86 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -41,21 +42,23 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
+
 /**
  * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
  * by calling the {@link #processDataBlock} and {@link #processDeleteBlock} 
methods into a record key based map.
  * The records from the base file is accessed from an iterator object. These 
records are merged when the
  * {@link #hasNext} method is called.
  */
-public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
-
-  public HoodieKeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
-                                             HoodieTableMetaClient 
hoodieTableMetaClient,
-                                             RecordMergeMode recordMergeMode,
-                                             Option<String> 
partitionNameOverrideOpt,
-                                             Option<String[]> 
partitionPathFieldOpt,
-                                             TypedProperties props,
-                                             HoodieReadStats readStats) {
+public class KeyBasedFileGroupRecordBuffer<T> extends FileGroupRecordBuffer<T> 
{
+
+  public KeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
+                                       HoodieTableMetaClient 
hoodieTableMetaClient,
+                                       RecordMergeMode recordMergeMode,
+                                       Option<String> partitionNameOverrideOpt,
+                                       Option<String[]> partitionPathFieldOpt,
+                                       TypedProperties props,
+                                       HoodieReadStats readStats) {
     super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
   }
 
@@ -82,7 +85,12 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
         Map<String, Object> metadata = readerContext.generateMetadataForRecord(
             nextRecord, schema);
         String recordKey = (String) 
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
-        processNextDataRecord(nextRecord, metadata, recordKey);
+
+        if (isBuiltInDeleteRecord(nextRecord) || 
isCustomDeleteRecord(nextRecord)) {
+          processDeleteRecord(nextRecord, metadata);
+        } else {
+          processNextDataRecord(nextRecord, metadata, recordKey);
+        }
       }
     }
   }
@@ -123,6 +131,20 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileGroupR
     }
   }
 
+  protected void processDeleteRecord(T record, Map<String, Object> metadata) {
+    DeleteRecord deleteRecord = DeleteRecord.create(
+        new HoodieKey(
+            (String) metadata.get(INTERNAL_META_RECORD_KEY),
+            // The partition path of the delete record is set to null because 
it is not
+            // used, and the delete record is never surfaced from the file 
group reader
+            null),
+        readerContext.getOrderingValue(
+            Option.of(record), metadata, readerSchema, orderingFieldName));
+    processNextDeletedRecord(
+        deleteRecord,
+        (String) metadata.get(INTERNAL_META_RECORD_KEY));
+  }
+
   @Override
   public boolean containsLogRecord(String recordKey) {
     return records.containsKey(recordKey);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
similarity index 87%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 74925592f22..b0e039a4b68 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -61,8 +62,8 @@ import static 
org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
  * Here the position means that record position in the base file. The records 
from the base file is accessed from an iterator object. These records are 
merged when the
  * {@link #hasNext} method is called.
  */
-public class HoodiePositionBasedFileGroupRecordBuffer<T> extends 
HoodieKeyBasedFileGroupRecordBuffer<T> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodiePositionBasedFileGroupRecordBuffer.class);
+public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupRecordBuffer<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PositionBasedFileGroupRecordBuffer.class);
 
   private static final String ROW_INDEX_COLUMN_NAME = "row_index";
   public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME = 
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
@@ -70,14 +71,14 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
   private long nextRecordPosition = 0L;
   private boolean needToDoHybridStrategy = false;
 
-  public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
-                                                  HoodieTableMetaClient 
hoodieTableMetaClient,
-                                                  RecordMergeMode 
recordMergeMode,
-                                                  Option<String> 
partitionNameOverrideOpt,
-                                                  Option<String[]> 
partitionPathFieldOpt,
-                                                  String baseFileInstantTime,
-                                                  TypedProperties props,
-                                                  HoodieReadStats readStats) {
+  public PositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
+                                            HoodieTableMetaClient 
hoodieTableMetaClient,
+                                            RecordMergeMode recordMergeMode,
+                                            Option<String> 
partitionNameOverrideOpt,
+                                            Option<String[]> 
partitionPathFieldOpt,
+                                            String baseFileInstantTime,
+                                            TypedProperties props,
+                                            HoodieReadStats readStats) {
     super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
     this.baseFileInstantTime = baseFileInstantTime;
   }
@@ -135,13 +136,13 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
         }
 
         long recordPosition = recordPositions.get(recordIndex++);
-
         T evolvedNextRecord = 
schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
-        processNextDataRecord(
-            evolvedNextRecord,
-            readerContext.generateMetadataForRecord(evolvedNextRecord, schema),
-            recordPosition
-        );
+        Map<String, Object> metadata = 
readerContext.generateMetadataForRecord(evolvedNextRecord, schema);
+        if (isBuiltInDeleteRecord(evolvedNextRecord) || 
isCustomDeleteRecord(evolvedNextRecord)) {
+          processDeleteRecord(evolvedNextRecord, metadata, recordPosition);
+        } else {
+          processNextDataRecord(evolvedNextRecord, metadata, recordPosition);
+        }
       }
     }
   }
@@ -202,6 +203,17 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
     }
   }
 
+  protected void processDeleteRecord(T record, Map<String, Object> metadata, 
long recordPosition) {
+    DeleteRecord deleteRecord = DeleteRecord.create(
+        new HoodieKey(
+            // The partition path of the delete record is set to null because 
it is not
+            // used, and the delete record is never surfaced from the file 
group reader
+            (String) metadata.get(INTERNAL_META_RECORD_KEY), null),
+        readerContext.getOrderingValue(
+            Option.of(record), metadata, readerSchema, orderingFieldName));
+    processNextDeletedRecord(deleteRecord, recordPosition);
+  }
+
   @Override
   public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable 
recordPosition) {
     Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordPosition);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedSchemaHandler.java
similarity index 83%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedSchemaHandler.java
index 71722f438a7..c91073b4f78 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedSchemaHandler.java
@@ -35,18 +35,18 @@ import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
-import static 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
+import static 
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
 
 /**
  * This class is responsible for handling the schema for the file group reader 
that supports positional merge.
  */
-public class HoodiePositionBasedSchemaHandler<T> extends 
HoodieFileGroupReaderSchemaHandler<T> {
-  public HoodiePositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
-                                          Schema dataSchema,
-                                          Schema requestedSchema,
-                                          Option<InternalSchema> 
internalSchemaOpt,
-                                          HoodieTableConfig hoodieTableConfig,
-                                          TypedProperties properties) {
+public class PositionBasedSchemaHandler<T> extends 
FileGroupReaderSchemaHandler<T> {
+  public PositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
+                                    Schema dataSchema,
+                                    Schema requestedSchema,
+                                    Option<InternalSchema> internalSchemaOpt,
+                                    HoodieTableConfig hoodieTableConfig,
+                                    TypedProperties properties) {
     super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, 
hoodieTableConfig, properties);
   }
 
@@ -60,7 +60,7 @@ public class HoodiePositionBasedSchemaHandler<T> extends 
HoodieFileGroupReaderSc
 
   @Override
   protected Option<InternalSchema> getInternalSchemaOpt(Option<InternalSchema> 
internalSchemaOpt) {
-    return 
internalSchemaOpt.map(HoodiePositionBasedSchemaHandler::addPositionalMergeCol);
+    return 
internalSchemaOpt.map(PositionBasedSchemaHandler::addPositionalMergeCol);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
similarity index 97%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index fec5c6fe2c4..7874b994c2a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -41,12 +41,12 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.Map;
 
-public class HoodieUnmergedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
+public class UnmergedFileGroupRecordBuffer<T> extends FileGroupRecordBuffer<T> 
{
   // Used to order the records in the record map.
   private Long putIndex = 0L;
   private Long getIndex = 0L;
 
-  public HoodieUnmergedFileGroupRecordBuffer(
+  public UnmergedFileGroupRecordBuffer(
       HoodieReaderContext<T> readerContext,
       HoodieTableMetaClient hoodieTableMetaClient,
       RecordMergeMode recordMergeMode,
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..a4b912e33b5
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+import static 
org.apache.hudi.common.table.read.FileGroupRecordBuffer.getOrderingValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link FileGroupRecordBuffer}
+ */
+class TestFileGroupRecordBuffer {
+  private String schemaString = "{"
+      + "\"type\": \"record\","
+      + "\"name\": \"EventRecord\","
+      + "\"namespace\": \"com.example.avro\","
+      + "\"fields\": ["
+      + "{\"name\": \"id\", \"type\": \"string\"},"
+      + "{\"name\": \"ts\", \"type\": \"long\"},"
+      + "{\"name\": \"op\", \"type\": \"string\"},"
+      + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\"}"
+      + "]"
+      + "}";
+  private Schema schema = new Schema.Parser().parse(schemaString);
+  private final HoodieReaderContext readerContext = 
mock(HoodieReaderContext.class);
+  private final FileGroupReaderSchemaHandler schemaHandler =
+      mock(FileGroupReaderSchemaHandler.class);
+  private HoodieTableMetaClient hoodieTableMetaClient = 
mock(HoodieTableMetaClient.class);
+  private Option<String> partitionNameOverrideOpt = Option.empty();
+  private Option<String[]> partitionPathFieldOpt = Option.empty();
+  private TypedProperties props = new TypedProperties();
+  private HoodieReadStats readStats = mock(HoodieReadStats.class);
+
+  @BeforeEach
+  void setUp() {
+    when(readerContext.getSchemaHandler()).thenReturn(schemaHandler);
+    when(schemaHandler.getRequiredSchema()).thenReturn(schema);
+    when(readerContext.getRecordMerger()).thenReturn(Option.empty());
+  }
+
+  @Test
+  void testGetOrderingValueFromDeleteRecord() {
+    HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+    DeleteRecord deleteRecord = mock(DeleteRecord.class);
+    mockDeleteRecord(deleteRecord, null);
+    assertEquals(DEFAULT_ORDERING_VALUE, getOrderingValue(readerContext, 
deleteRecord));
+    mockDeleteRecord(deleteRecord, DEFAULT_ORDERING_VALUE);
+    assertEquals(DEFAULT_ORDERING_VALUE, getOrderingValue(readerContext, 
deleteRecord));
+    String orderingValue = "xyz";
+    String convertedValue = "_xyz";
+    mockDeleteRecord(deleteRecord, orderingValue);
+    
when(readerContext.convertValueToEngineType(orderingValue)).thenReturn(convertedValue);
+    assertEquals(convertedValue, getOrderingValue(readerContext, 
deleteRecord));
+  }
+
+  @ParameterizedTest
+  @CsvSource({
+      "true, true, true, EVENT_TIME_ORDERING",
+      "true, false, false, EVENT_TIME_ORDERING",
+      "false, true, false, EVENT_TIME_ORDERING",
+      "false, false, true, EVENT_TIME_ORDERING",
+      "true, true, true, COMMIT_TIME_ORDERING",
+      "true, false, false, COMMIT_TIME_ORDERING",
+      "false, true, false, COMMIT_TIME_ORDERING",
+      "false, false, true, COMMIT_TIME_ORDERING",
+      "true, true, true, CUSTOM",
+      "true, false, false, CUSTOM",
+      "false, true, false, CUSTOM",
+      "false, false, true, CUSTOM",
+      "true, true, true,",
+      "true, false, false,",
+      "false, true, false,",
+      "false, false, true,"
+  })
+  public void testSchemaForMandatoryFields(boolean setPrecombine,
+                                           boolean addHoodieIsDeleted,
+                                           boolean addCustomDeleteMarker,
+                                           RecordMergeMode mergeMode) {
+    HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+    when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
+    when(readerContext.getHasLogFiles()).thenReturn(true);
+    HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+    when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
+    when(recordMerger.isProjectionCompatible()).thenReturn(false);
+
+    String preCombineField = "ts";
+    String customDeleteKey = "colC";
+    String customDeleteValue = "D";
+    List<String> dataSchemaFields = new ArrayList<>();
+    dataSchemaFields.addAll(Arrays.asList(
+        HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, preCombineField,
+        "colA", "colB", "colC", "colD"));
+    if (addHoodieIsDeleted) {
+      dataSchemaFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+    }
+
+    Schema dataSchema = getSchema(dataSchemaFields);
+    Schema requestedSchema = 
getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+    when(tableConfig.populateMetaFields()).thenReturn(true);
+    when(tableConfig.getPreCombineField()).thenReturn(setPrecombine ? 
preCombineField : StringUtils.EMPTY_STRING);
+
+    TypedProperties props = new TypedProperties();
+    if (addCustomDeleteMarker) {
+      props.setProperty(DELETE_KEY, customDeleteKey);
+      props.setProperty(DELETE_MARKER, customDeleteValue);
+    }
+    FileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new 
FileGroupReaderSchemaHandler(readerContext,
+        dataSchema, requestedSchema, Option.empty(), tableConfig, props);
+    List<String> expectedFields = new ArrayList();
+    expectedFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    expectedFields.add(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    if (addCustomDeleteMarker) {
+      expectedFields.add(customDeleteKey);
+    }
+    if (setPrecombine && mergeMode != RecordMergeMode.COMMIT_TIME_ORDERING) { 
// commit time ordering does not project ordering field.
+      expectedFields.add(preCombineField);
+    }
+    if (addHoodieIsDeleted) {
+      expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+    }
+    Schema expectedSchema = mergeMode == RecordMergeMode.CUSTOM ? dataSchema : 
getSchema(expectedFields);
+    Schema actualSchema = 
fileGroupReaderSchemaHandler.generateRequiredSchema();
+    assertEquals(expectedSchema, actualSchema);
+    assertEquals(addHoodieIsDeleted, 
fileGroupReaderSchemaHandler.hasBuiltInDelete());
+    assertEquals(addCustomDeleteMarker
+            ? Option.of(Pair.of(customDeleteKey, customDeleteValue)) : 
Option.empty(),
+        fileGroupReaderSchemaHandler.getCustomDeleteMarkerKeyValue());
+  }
+
+  @ParameterizedTest
+  @CsvSource({"true,false", "false,true"})
+  void testInvalidCustomDeleteConfigs(boolean configureCustomDeleteKey,
+                                      boolean configureCustomDeleteMarker) {
+    HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+    when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
+    when(readerContext.getHasLogFiles()).thenReturn(true);
+    HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+    when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
+    when(recordMerger.isProjectionCompatible()).thenReturn(false);
+
+    String customDeleteKey = "colC";
+    String customDeleteValue = "D";
+    List<String> dataSchemaFields = new ArrayList<>(Arrays.asList(
+        HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+        "colA", "colB", "colC", "colD"));
+
+    Schema dataSchema = getSchema(dataSchemaFields);
+    Schema requestedSchema = 
getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+    TypedProperties props = new TypedProperties();
+    if (configureCustomDeleteKey) {
+      props.setProperty(DELETE_KEY, customDeleteKey);
+    }
+    if (configureCustomDeleteMarker) {
+      props.setProperty(DELETE_MARKER, customDeleteValue);
+    }
+    Throwable exception = assertThrows(IllegalArgumentException.class,
+        () -> new FileGroupReaderSchemaHandler(readerContext,
+            dataSchema, requestedSchema, Option.empty(), tableConfig, props));
+    assertEquals("Either custom delete key or marker is not specified",
+        exception.getMessage());
+  }
+
+  private Schema getSchema(List<String> fields) {
+    SchemaBuilder.FieldAssembler<Schema> schemaFieldAssembler = 
SchemaBuilder.builder().record("test_schema")
+        .namespace("test_namespace").fields();
+    for (String field : fields) {
+      schemaFieldAssembler = 
schemaFieldAssembler.name(field).type().stringType().noDefault();
+    }
+    return schemaFieldAssembler.endRecord();
+  }
+
+  private void mockDeleteRecord(DeleteRecord deleteRecord,
+                                Comparable orderingValue) {
+    when(deleteRecord.getOrderingValue()).thenReturn(orderingValue);
+  }
+
+  @Test
+  void testIsCustomDeleteRecord() {
+    String customDeleteKey = "op";
+    String customDeleteValue = "d";
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "12345");
+    record.put("ts", System.currentTimeMillis());
+    record.put(customDeleteKey, "d");
+
+    when(schemaHandler.getCustomDeleteMarkerKeyValue())
+        .thenReturn(Option.of(Pair.of(customDeleteKey, customDeleteValue)));
+    KeyBasedFileGroupRecordBuffer keyBasedBuffer =
+        new KeyBasedFileGroupRecordBuffer(
+            readerContext,
+            hoodieTableMetaClient,
+            RecordMergeMode.COMMIT_TIME_ORDERING,
+            partitionNameOverrideOpt,
+            partitionPathFieldOpt,
+            props,
+            readStats);
+    when(readerContext.getValue(any(), any(), any())).thenReturn(null);
+    assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
+
+    props.setProperty(DELETE_KEY, customDeleteKey);
+    props.setProperty(DELETE_MARKER, customDeleteValue);
+    keyBasedBuffer = new KeyBasedFileGroupRecordBuffer(
+            readerContext,
+            hoodieTableMetaClient,
+            RecordMergeMode.COMMIT_TIME_ORDERING,
+            partitionNameOverrideOpt,
+            partitionPathFieldOpt,
+            props,
+            readStats);
+    when(readerContext.getValue(any(), any(), any())).thenReturn("i");
+    assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
+    when(readerContext.getValue(any(), any(), any())).thenReturn("d");
+    assertTrue(keyBasedBuffer.isCustomDeleteRecord(record));
+  }
+
+  @Test
+  void testProcessCustomDeleteRecord() {
+    String customDeleteKey = "op";
+    String customDeleteValue = "d";
+    when(schemaHandler.getCustomDeleteMarkerKeyValue())
+        .thenReturn(Option.of(Pair.of(customDeleteKey, customDeleteValue)));
+    when(schemaHandler.hasBuiltInDelete()).thenReturn(true);
+    KeyBasedFileGroupRecordBuffer keyBasedBuffer =
+        new KeyBasedFileGroupRecordBuffer(
+            readerContext,
+            hoodieTableMetaClient,
+            RecordMergeMode.COMMIT_TIME_ORDERING,
+            partitionNameOverrideOpt,
+            partitionPathFieldOpt,
+            props,
+            readStats);
+
+    // CASE 1: With custom delete marker.
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "12345");
+    record.put("ts", System.currentTimeMillis());
+    record.put("op", "d");
+    record.put("_hoodie_is_deleted", false);
+
+    Map<String, Object> metadata = new HashMap<>();
+    metadata.put(INTERNAL_META_RECORD_KEY, "12345");
+    metadata.put(INTERNAL_META_PARTITION_PATH, "partition1");
+    when(readerContext.getOrderingValue(any(), any(), any(), 
any())).thenReturn(1);
+    when(readerContext.generateMetadataForRecord(any(), any(), 
any())).thenReturn(metadata);
+    keyBasedBuffer.processDeleteRecord(record, metadata);
+    Map<Serializable, Pair<Option<GenericRecord>, Map<String, Object>>> 
records =
+        keyBasedBuffer.getLogRecords();
+    assertEquals(1, records.size());
+    assertEquals(Pair.of(Option.empty(), metadata), records.get("12345"));
+
+    // CASE 2: With _hoodie_is_deleted is true.
+    GenericRecord anotherRecord = new GenericData.Record(schema);
+    anotherRecord.put("id", "54321");
+    anotherRecord.put("ts", System.currentTimeMillis());
+    anotherRecord.put("op", "i");
+    anotherRecord.put("_hoodie_is_deleted", true);
+
+    Map<String, Object> anotherMetadata = new HashMap<>();
+    anotherMetadata.put(INTERNAL_META_RECORD_KEY, "54321");
+    anotherMetadata.put(INTERNAL_META_PARTITION_PATH, "partition2");
+    when(readerContext.generateMetadataForRecord(any(), any(), 
any())).thenReturn(anotherMetadata);
+    keyBasedBuffer.processDeleteRecord(anotherRecord, anotherMetadata);
+    records = keyBasedBuffer.getLogRecords();
+    assertEquals(2, records.size());
+    assertEquals(Pair.of(Option.empty(), anotherMetadata), 
records.get("54321"));
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index e72d5dd40ce..3f1720734a2 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -359,7 +359,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     }
     if (fileSlice.getBaseFile().get().getBootstrapBaseFile().isPresent()) {
       //TODO: [HUDI-8169] this code path will not hit until we implement 
bootstrap tests
-      Pair<List<Schema.Field>, List<Schema.Field>> dataAndMetaCols = 
HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(requestedSchema);
+      Pair<List<Schema.Field>, List<Schema.Field>> dataAndMetaCols = 
FileGroupReaderSchemaHandler.getDataAndMetaCols(requestedSchema);
       return !dataAndMetaCols.getLeft().isEmpty() && 
!dataAndMetaCols.getRight().isEmpty();
     }
     return false;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
deleted file mode 100644
index ee369410c8e..00000000000
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.table.read;
-
-import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
-import static 
org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.getOrderingValue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests {@link HoodieBaseFileGroupRecordBuffer}
- */
-public class TestHoodieFileGroupRecordBuffer {
-  @Test
-  void testGetOrderingValueFromDeleteRecord() {
-    HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
-    DeleteRecord deleteRecord = mock(DeleteRecord.class);
-    mockDeleteRecord(deleteRecord, null);
-    assertEquals(DEFAULT_ORDERING_VALUE, getOrderingValue(readerContext, 
deleteRecord));
-    mockDeleteRecord(deleteRecord, DEFAULT_ORDERING_VALUE);
-    assertEquals(DEFAULT_ORDERING_VALUE, getOrderingValue(readerContext, 
deleteRecord));
-    String orderingValue = "xyz";
-    String convertedValue = "_xyz";
-    mockDeleteRecord(deleteRecord, orderingValue);
-    
when(readerContext.convertValueToEngineType(orderingValue)).thenReturn(convertedValue);
-    assertEquals(convertedValue, getOrderingValue(readerContext, 
deleteRecord));
-  }
-
-  @ParameterizedTest
-  @CsvSource({
-      "true, true, EVENT_TIME_ORDERING",
-      "true, false, EVENT_TIME_ORDERING",
-      "false, true, EVENT_TIME_ORDERING",
-      "false, false, EVENT_TIME_ORDERING",
-      "true, true, COMMIT_TIME_ORDERING",
-      "true, false, COMMIT_TIME_ORDERING",
-      "false, true, COMMIT_TIME_ORDERING",
-      "false, false, COMMIT_TIME_ORDERING",
-      "true, true, CUSTOM",
-      "true, false, CUSTOM",
-      "false, true, CUSTOM",
-      "false, false, CUSTOM",
-      "true, true,",
-      "true, false,",
-      "false, true,",
-      "false, false,"
-  })
-  public void testSchemaForMandatoryFields(boolean setPrecombine, boolean 
addHoodieIsDeleted, RecordMergeMode mergeMode) {
-    HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
-    when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
-    when(readerContext.getHasLogFiles()).thenReturn(true);
-    HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
-    when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
-    when(recordMerger.isProjectionCompatible()).thenReturn(false);
-
-    String preCombineField = "ts";
-    List<String> dataSchemaFields = new ArrayList<>();
-    dataSchemaFields.addAll(Arrays.asList(
-        HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, preCombineField,
-        "colA", "colB", "colC", "colD"));
-    if (addHoodieIsDeleted) {
-      dataSchemaFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
-    }
-
-    Schema dataSchema = getSchema(dataSchemaFields);
-    Schema requestedSchema = 
getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
-
-    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
-    when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode);
-    when(tableConfig.populateMetaFields()).thenReturn(true);
-    when(tableConfig.getPreCombineField()).thenReturn(setPrecombine ? 
preCombineField : StringUtils.EMPTY_STRING);
-
-    TypedProperties props = new TypedProperties();
-    HoodieFileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new 
HoodieFileGroupReaderSchemaHandler(readerContext,
-        dataSchema, requestedSchema, Option.empty(), tableConfig, props);
-    List<String> expectedFields = new ArrayList();
-    expectedFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
-    expectedFields.add(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
-    if (setPrecombine && mergeMode != RecordMergeMode.COMMIT_TIME_ORDERING) { 
// commit time ordering does not project ordering field.
-      expectedFields.add(preCombineField);
-    }
-    if (addHoodieIsDeleted) {
-      expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
-    }
-    Schema expectedSchema = mergeMode == RecordMergeMode.CUSTOM ? dataSchema : 
getSchema(expectedFields);
-    Schema actualSchema = 
fileGroupReaderSchemaHandler.generateRequiredSchema();
-    assertEquals(expectedSchema, actualSchema);
-  }
-
-  private Schema getSchema(List<String> fields) {
-    SchemaBuilder.FieldAssembler<Schema> schemaFieldAssembler = 
SchemaBuilder.builder().record("test_schema")
-        .namespace("test_namespace").fields();
-    for (String field : fields) {
-      schemaFieldAssembler = 
schemaFieldAssembler.name(field).type().stringType().noDefault();
-    }
-    return schemaFieldAssembler.endRecord();
-  }
-
-  private void mockDeleteRecord(DeleteRecord deleteRecord,
-                                Comparable orderingValue) {
-    when(deleteRecord.getOrderingValue()).thenReturn(orderingValue);
-  }
-}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index fda3018e778..b0eaf6cb25c 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -208,6 +208,16 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
     return StringUtils.isNullOrEmpty(fieldName) ? null : 
objectInspectorCache.getValue(record, schema, fieldName);
   }
 
+  @Override
+  public boolean castToBoolean(Object value) {
+    if (value instanceof BooleanWritable) {
+      return ((BooleanWritable) value).get();
+    } else {
+      throw new IllegalArgumentException(
+          "Expected BooleanWritable but got " + value.getClass());
+    }
+  }
+
   @Override
   public HoodieRecord<ArrayWritable> 
constructHoodieRecord(Option<ArrayWritable> recordOption, Map<String, Object> 
metadataMap) {
     if (!recordOption.isPresent()) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
index 336e1bbe83d..fec096920aa 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
@@ -22,7 +22,7 @@ package org.apache.spark.execution.datasources.parquet
 import org.apache.hudi.SparkFileFormatInternalRowReaderContext
 import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.filterIsSafeForBootstrap
 import org.apache.hudi.common.model.HoodieRecord
-import 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import 
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 
 import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
similarity index 96%
rename from 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 3c80ef83919..c927893a115 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -33,9 +33,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer;
-import org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler;
 import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.PositionBasedSchemaHandler;
 import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -70,11 +70,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class TestHoodiePositionBasedFileGroupRecordBuffer extends 
TestHoodieFileGroupReaderOnSpark {
+public class TestPositionBasedFileGroupRecordBuffer extends 
TestHoodieFileGroupReaderOnSpark {
   private final HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF);
   private HoodieTableMetaClient metaClient;
   private Schema avroSchema;
-  private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer;
+  private PositionBasedFileGroupRecordBuffer<InternalRow> buffer;
   private String partitionPath;
   private HoodieReadStats readStats;
 
@@ -123,7 +123,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
     } else {
       ctx.setRecordMerger(Option.empty());
     }
-    ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx, 
avroSchema, avroSchema,
+    ctx.setSchemaHandler(new PositionBasedSchemaHandler<>(ctx, avroSchema, 
avroSchema,
         Option.empty(), metaClient.getTableConfig(), new TypedProperties()));
     TypedProperties props = new TypedProperties();
     props.put("hoodie.write.record.merge.mode", mergeMode.name());
@@ -136,7 +136,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
       writeConfigs.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
     }
     readStats = new HoodieReadStats();
-    buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
+    buffer = new PositionBasedFileGroupRecordBuffer<>(
         ctx,
         metaClient,
         mergeMode,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index ae5d45d7dfa..4120956b4ce 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,15 +19,22 @@
 
 package org.apache.hudi.common.table.read
 
-import org.apache.hudi.{SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
-import org.apache.hudi.common.config.HoodieReaderConfig
+import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode}
 import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
 import org.apache.hudi.common.engine.HoodieReaderContext
+import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord, 
WriteOperationType}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY, 
DELETE_MARKER}
 import org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import 
org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark.getFileCount
 import org.apache.hudi.common.testutils.{HoodieTestUtils, RawTripTestPayload}
 import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.storage.StorageConfiguration
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.storage.{StorageConfiguration, StoragePath}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
@@ -39,7 +46,9 @@ import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 import org.mockito.Mockito
 
 import java.util
@@ -165,6 +174,111 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
       sparkReaderContext, row, avroSchema, "non_existent_col", 
DEFAULT_ORDERING_VALUE)
   }
 
+  val expectedEventTimeBased: Seq[(Int, String, String, String, Double, 
String)] = Seq(
+    (10, "5", "rider-E", "driver-E", 17.85, "i"),
+    (10, "3", "rider-C", "driver-C", 33.9, "i"),
+    (10, "2", "rider-B", "driver-B", 27.7, "i"),
+    (20, "1", "rider-Z", "driver-Z", 27.7, "i"))
+  val expectedCommitTimeBased: Seq[(Int, String, String, String, Double, 
String)] = Seq(
+    (10, "5", "rider-E", "driver-E", 17.85, "i"),
+    (10, "3", "rider-C", "driver-C", 33.9, "i"),
+    (20, "1", "rider-Z", "driver-Z", 27.7, "i"))
+
+  @ParameterizedTest
+  @MethodSource(Array("customDeleteTestParams"))
+  def testCustomDelete(useFgReader: String,
+                       tableType: String,
+                       positionUsed: String,
+                       mergeMode: String): Unit = {
+    val payloadClass = 
"org.apache.hudi.common.table.read.CustomPayloadForTesting"
+    val fgReaderOpts: Map[String, String] = Map(
+      HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key -> "0",
+      HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader,
+      HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key -> positionUsed,
+      HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode
+    )
+    val deleteOpts: Map[String, String] = Map(
+      DELETE_KEY -> "op", DELETE_MARKER -> "d")
+    val readOpts = if (mergeMode.equals("CUSTOM")) {
+      fgReaderOpts ++ deleteOpts ++ Map(
+        HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> payloadClass)
+    } else {
+      fgReaderOpts ++ deleteOpts
+    }
+    val opts = readOpts
+    val columns = Seq("ts", "key", "rider", "driver", "fare", "op")
+
+    val data = Seq(
+      (10, "1", "rider-A", "driver-A", 19.10, "i"),
+      (10, "2", "rider-B", "driver-B", 27.70, "i"),
+      (10, "3", "rider-C", "driver-C", 33.90, "i"),
+      (10, "4", "rider-D", "driver-D", 34.15, "i"),
+      (10, "5", "rider-E", "driver-E", 17.85, "i"))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(getBasePath)
+    val metaClient = HoodieTableMetaClient
+      .builder().setConf(getStorageConf).setBasePath(getBasePath).build
+    assertEquals((1, 0), getFileCount(metaClient, getBasePath))
+
+    // Delete using delete markers.
+    val updateData = Seq(
+      (11, "1", "rider-X", "driver-X", 19.10, "d"),
+      (9, "2", "rider-Y", "driver-Y", 27.70, "d"))
+    val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+    updates.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      options(opts).
+      mode(SaveMode.Append).
+      save(getBasePath)
+    assertEquals((1, 1), getFileCount(metaClient, getBasePath))
+
+    // Delete from operation.
+    val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+    val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+    deletes.write.format("hudi").
+      option(OPERATION.key(), "DELETE").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      options(opts).
+      mode(SaveMode.Append).
+      save(getBasePath)
+    assertEquals((1, 2), getFileCount(metaClient, getBasePath))
+
+    // Add a record back to test ensure event time ordering work.
+    val updateDataSecond = Seq(
+      (20, "1", "rider-Z", "driver-Z", 27.70, "i"))
+    val updatesSecond = spark.createDataFrame(updateDataSecond).toDF(columns: 
_*)
+    updatesSecond.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      options(opts).
+      mode(SaveMode.Append).
+      save(getBasePath)
+    // Validate data file number.
+    assertEquals((1, 3), getFileCount(metaClient, getBasePath))
+
+    // Validate in the end.
+    val columnsToCompare = Set("ts", "key", "rider", "driver", "fare", "op")
+    val df = spark.read.options(readOpts).format("hudi").load(getBasePath)
+    val finalDf = df.select("ts", "key", "rider", "driver", "fare", 
"op").sort("key")
+    val expected = if (mergeMode == 
RecordMergeMode.EVENT_TIME_ORDERING.name()) {
+      expectedEventTimeBased
+    } else {
+      expectedCommitTimeBased
+    }
+    val expectedDf = spark.createDataFrame(expected).toDF(columns: 
_*).sort("key")
+    assertTrue(
+      SparkClientFunctionalTestHarness.areDataframesEqual(expectedDf, finalDf, 
columnsToCompare.asJava))
+  }
+
   private def testGetOrderingValue(sparkReaderContext: 
HoodieReaderContext[InternalRow],
                                    row: InternalRow,
                                    avroSchema: Schema,
@@ -177,3 +291,26 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
       metadataMap.get(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD))
   }
 }
+
+object TestHoodieFileGroupReaderOnSpark {
+  def customDeleteTestParams(): java.util.List[Arguments] = {
+    java.util.Arrays.asList(
+      Arguments.of("true", "MERGE_ON_READ", "false", "EVENT_TIME_ORDERING"),
+      Arguments.of("true", "MERGE_ON_READ", "true", "EVENT_TIME_ORDERING"),
+      Arguments.of("true", "MERGE_ON_READ", "false", "COMMIT_TIME_ORDERING"),
+      Arguments.of("true", "MERGE_ON_READ", "true", "COMMIT_TIME_ORDERING"),
+      Arguments.of("true", "MERGE_ON_READ", "false", "CUSTOM"),
+      Arguments.of("true", "MERGE_ON_READ", "true", "CUSTOM"))
+  }
+
+  def getFileCount(metaClient: HoodieTableMetaClient, basePath: String): 
(Long, Long) = {
+    val newMetaClient = HoodieTableMetaClient.reload(metaClient)
+    val files = newMetaClient.getStorage.listFiles(new StoragePath(basePath))
+    (files.stream().filter(f =>
+      f.getPath.getParent.equals(new StoragePath(basePath))
+        && FSUtils.isBaseFile(f.getPath)).count(),
+      files.stream().filter(f =>
+        f.getPath.getParent.equals(new StoragePath(basePath))
+          && FSUtils.isLogFile(f.getPath)).count())
+  }
+}


Reply via email to