This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6727c45ce28 [HUDI-6795] Implement writing record_positions to log 
blocks for updates and deletes (#9581)
6727c45ce28 is described below

commit 6727c45ce289be41d7ddde14406b274d3f926f2e
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Oct 10 00:58:37 2023 -0700

    [HUDI-6795] Implement writing record_positions to log blocks for updates 
and deletes (#9581)
    
    Implement writing record positions to the block header for data blocks 
containing updates and delete blocks.
    
    - Add a new long field `position` to the `HoodieRecordLocation` model. It 
is the position of a record in the file, e.g., row position starting from 0 in 
the Parquet file.
    - Positions will only be written if `hoodie.write.record.positions` is set 
to true.
    - Changes in `HoodieDataBlock` and `HoodieDeleteBlock` to handle writing 
record positions.
    - Change `HoodieFileReader#filterRowKeys` to return not only the set of 
filtered keys but also their position in the file.
    - Log format tests for the new header and e2e tests for MOR table with the 
aforementioned config enabled.
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../cli/commands/TestHoodieLogFileCommand.java     |  4 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 17 +++++
 .../org/apache/hudi/index/HoodieIndexUtils.java    | 19 +++--
 .../bloom/ListBasedHoodieBloomIndexHelper.java     | 15 ++--
 .../org/apache/hudi/io/HoodieAppendHandle.java     | 24 ++++---
 .../hudi/io/HoodieKeyLocationFetchHandle.java      | 20 +++---
 .../org/apache/hudi/io/HoodieKeyLookupHandle.java  |  8 +--
 .../org/apache/hudi/io/HoodieKeyLookupResult.java  | 12 ++--
 .../metadata/HoodieBackedTableMetadataWriter.java  |  4 +-
 .../utils/TestLegacyArchivedMetaEntryReader.java   |  2 +-
 .../hudi/testutils/HoodieWriteableTestTable.java   |  3 +-
 .../index/bloom/TestFlinkHoodieBloomIndex.java     | 11 ++-
 .../testutils/HoodieFlinkWriteableTestTable.java   |  2 +-
 .../index/bloom/HoodieFileProbingFunction.java     |  9 +--
 .../index/bloom/SparkHoodieBloomIndexHelper.java   | 10 +--
 .../hudi/io/storage/HoodieSparkParquetReader.java  | 15 ++--
 .../hudi/index/bloom/TestHoodieBloomIndex.java     | 11 ++-
 .../hudi/io/TestHoodieKeyLocationFetchHandle.java  |  4 +-
 .../org/apache/hudi/common/model/HoodieRecord.java | 16 +++--
 .../common/model/HoodieRecordGlobalLocation.java   | 15 ++--
 .../hudi/common/model/HoodieRecordLocation.java    | 32 ++++++++-
 .../hudi/common/table/log/LogReaderUtils.java      |  3 +-
 .../table/log/block/HoodieAvroDataBlock.java       |  9 +--
 .../common/table/log/block/HoodieCDCDataBlock.java |  2 +-
 .../common/table/log/block/HoodieDataBlock.java    | 24 +++++++
 .../common/table/log/block/HoodieDeleteBlock.java  | 38 +++++++++-
 .../table/log/block/HoodieHFileDataBlock.java      |  2 +-
 .../common/table/log/block/HoodieLogBlock.java     | 19 ++++-
 .../table/log/block/HoodieParquetDataBlock.java    |  3 +-
 .../org/apache/hudi/common/util/BaseFileUtils.java | 31 +++++----
 .../java/org/apache/hudi/common/util/OrcUtils.java | 33 +++++----
 .../org/apache/hudi/common/util/ParquetUtils.java  | 45 +++++++-----
 .../hudi/io/storage/HoodieAvroHFileReader.java     | 30 ++++----
 .../hudi/io/storage/HoodieAvroOrcReader.java       |  5 +-
 .../hudi/io/storage/HoodieAvroParquetReader.java   |  3 +-
 .../hudi/io/storage/HoodieBootstrapFileReader.java |  3 +-
 .../apache/hudi/io/storage/HoodieFileReader.java   |  6 +-
 .../common/functional/TestHoodieLogFormat.java     | 81 ++++++++++++++++------
 .../TestHoodieLogFormatAppendFailure.java          |  3 +-
 .../hudi/common/table/log/TestLogReaderUtils.java  | 15 ++--
 .../table/log/block/TestHoodieDeleteBlock.java     |  9 ++-
 .../apache/hudi/common/util/TestParquetUtils.java  | 27 ++++----
 .../hudi/common/util/TestSerializationUtils.java   |  7 +-
 .../io/storage/TestHoodieReaderWriterBase.java     | 14 ++--
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  4 +-
 .../TestHoodieSparkMergeOnReadTableClustering.java | 13 ++--
 .../hudi/functional/TestMORDataSourceStorage.scala |  2 +
 .../apache/hudi/hive/testutils/HiveTestUtil.java   |  2 +-
 48 files changed, 468 insertions(+), 218 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 12de150f2d3..9bfbdfc4edd 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -121,7 +121,7 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-      dataBlock = new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      dataBlock = new HoodieAvroDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       writer.appendBlock(dataBlock);
 
       Map<HoodieLogBlock.HeaderMetadataType, String> rollbackHeader = new 
HashMap<>();
@@ -217,7 +217,7 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-      HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, false, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
       writer.appendBlock(dataBlock);
     } finally {
       if (writer != null) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 87734c042d3..4a0087bc0a3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -752,6 +752,14 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "The class must be a subclass of 
`org.apache.hudi.callback.HoodieClientInitCallback`."
           + "By default, no Hudi client init callback is executed.");
 
+  public static final ConfigProperty<Boolean> WRITE_RECORD_POSITIONS = 
ConfigProperty
+      .key("hoodie.write.record.positions")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Whether to write record positions to the block 
header for data blocks containing updates and delete blocks. "
+          + "The record positions can be used to improve the performance of 
merging records from base and log files.");
+
   /**
    * Config key with boolean value that indicates whether record being written 
during MERGE INTO Spark SQL
    * operation are already prepped.
@@ -2052,6 +2060,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getLong(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE);
   }
 
+  public boolean shouldWriteRecordPositions() {
+    return getBoolean(WRITE_RECORD_POSITIONS);
+  }
+
   public double getParquetCompressionRatio() {
     return getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION);
   }
@@ -3090,6 +3102,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withWriteRecordPositionsEnabled(boolean 
shouldWriteRecordPositions) {
+      writeConfig.setValue(WRITE_RECORD_POSITIONS, 
String.valueOf(shouldWriteRecordPositions));
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 24a4dc05d10..33e8d501943 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -61,7 +61,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.hudi.table.action.commit.HoodieDeleteHelper.createDeleteRecord;
@@ -173,18 +173,18 @@ public class HoodieIndexUtils {
    *
    * @param filePath            - File to filter keys from
    * @param candidateRecordKeys - Candidate keys to filter
-   * @return List of candidate keys that are available in the file
+   * @return List of pairs of candidate keys and positions that are available 
in the file
    */
-  public static List<String> filterKeysFromFile(Path filePath, List<String> 
candidateRecordKeys,
-                                                Configuration configuration) 
throws HoodieIndexException {
+  public static List<Pair<String, Long>> filterKeysFromFile(Path filePath, 
List<String> candidateRecordKeys,
+                                                            Configuration 
configuration) throws HoodieIndexException {
     ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
-    List<String> foundRecordKeys = new ArrayList<>();
+    List<Pair<String, Long>> foundRecordKeys = new ArrayList<>();
     try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(configuration, filePath)) {
       // Load all rowKeys from the file, to double-confirm
       if (!candidateRecordKeys.isEmpty()) {
         HoodieTimer timer = HoodieTimer.start();
-        Set<String> fileRowKeys = fileReader.filterRowKeys(new 
TreeSet<>(candidateRecordKeys));
+        Set<Pair<String, Long>> fileRowKeys = 
fileReader.filterRowKeys(candidateRecordKeys.stream().collect(Collectors.toSet()));
         foundRecordKeys.addAll(fileRowKeys);
         LOG.info(String.format("Checked keys against file %s, in %d ms. 
#candidates (%d) #found (%d)", filePath,
             timer.endTimer(), candidateRecordKeys.size(), 
foundRecordKeys.size()));
@@ -282,9 +282,14 @@ public class HoodieIndexUtils {
     HoodieData<HoodieRecord<R>> untaggedUpdatingRecords = 
incomingRecordsAndLocations.filter(p -> 
p.getRight().isPresent()).map(Pair::getLeft)
         .distinctWithKey(HoodieRecord::getRecordKey, 
config.getGlobalIndexReconcileParallelism());
     // the tagging partitions and locations
+    // NOTE: The incoming records may only differ in record position, however, 
for the purpose of
+    //       merging in case of partition updates, it is safe to ignore the 
record positions.
     HoodieData<HoodieRecordGlobalLocation> globalLocations = 
incomingRecordsAndLocations
         .filter(p -> p.getRight().isPresent())
-        .map(p -> p.getRight().get())
+        .map(p -> new HoodieRecordGlobalLocation(
+            p.getRight().get().getPartitionPath(),
+            p.getRight().get().getInstantTime(),
+            p.getRight().get().getFileId()))
         .distinct(config.getGlobalIndexReconcileParallelism());
     // merged existing records with current locations being set
     HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations, config, hoodieTable);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
index b47f5cf066c..03e4441393d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
@@ -64,22 +64,23 @@ public class ListBasedHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper
 
     List<HoodieKeyLookupResult> keyLookupResults =
         CollectionUtils.toStream(
-          new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, 
String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
-              .apply(fileComparisonPairList.iterator())
-        )
+            new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, 
String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
+                .apply(fileComparisonPairList.iterator())
+            )
             .flatMap(Collection::stream)
-            .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
+            .filter(lr -> lr.getMatchingRecordKeysAndPositions().size() > 0)
             .collect(toList());
 
     return context.parallelize(keyLookupResults).flatMap(lookupResult ->
-        lookupResult.getMatchingRecordKeys().stream()
+        lookupResult.getMatchingRecordKeysAndPositions().stream()
             .map(recordKey -> new ImmutablePair<>(lookupResult, 
recordKey)).iterator()
     ).mapToPair(pair -> {
       HoodieKeyLookupResult lookupResult = pair.getLeft();
-      String recordKey = pair.getRight();
+      String recordKey = pair.getRight().getLeft();
+      long recordPosition = pair.getRight().getRight();
       return new ImmutablePair<>(
           new HoodieKey(recordKey, lookupResult.getPartitionPath()),
-          new HoodieRecordLocation(lookupResult.getBaseInstantTime(), 
lookupResult.getFileId()));
+          new HoodieRecordLocation(lookupResult.getBaseInstantTime(), 
lookupResult.getFileId(), recordPosition));
     });
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index f4cd010da57..1e14ba724b7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -49,6 +49,7 @@ import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.SizeEstimator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieAppendException;
 import org.apache.hudi.exception.HoodieException;
@@ -87,10 +88,11 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
 
   protected final String fileId;
+  private final boolean shouldWriteRecordPositions;
   // Buffer for holding records in memory before they are flushed to disk
   private final List<HoodieRecord> recordList = new ArrayList<>();
-  // Buffer for holding records (to be deleted) in memory before they are 
flushed to disk
-  private final List<DeleteRecord> recordsToDelete = new ArrayList<>();
+  // Buffer for holding records (to be deleted), along with their position in 
log block, in memory before they are flushed to disk
+  private final List<Pair<DeleteRecord, Long>> recordsToDeleteWithPositions = 
new ArrayList<>();
   // Incoming records to be written to logs.
   protected Iterator<HoodieRecord<T>> recordItr;
   // Writer to log into the file group's latest slice.
@@ -155,6 +157,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     this.statuses = new ArrayList<>();
     this.recordProperties.putAll(config.getProps());
     this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
+    this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
   }
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
@@ -459,12 +462,14 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
             ? HoodieRecord.RECORD_KEY_METADATA_FIELD
             : 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
 
-        blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
+        blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
shouldWriteRecordPositions,
+            getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, 
config,
             addBlockIdentifier()), keyField));
       }
 
-      if (appendDeleteBlocks && recordsToDelete.size() > 0) {
-        blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new 
DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, 
attemptNumber, config,
+      if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
+        blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions, 
shouldWriteRecordPositions,
+            getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, 
config,
             addBlockIdentifier())));
       }
 
@@ -473,7 +478,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
         processAppendResult(appendResult, recordList);
         recordList.clear();
         if (appendDeleteBlocks) {
-          recordsToDelete.clear();
+          recordsToDeleteWithPositions.clear();
         }
       }
     } catch (Exception e) {
@@ -594,7 +599,8 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
         LOG.error("Error writing record  " + indexedRecord.get(), e);
       }
     } else {
-      recordsToDelete.add(DeleteRecord.create(record.getKey(), orderingVal));
+      long position = shouldWriteRecordPositions ? record.getCurrentPosition() 
: -1L;
+      
recordsToDeleteWithPositions.add(Pair.of(DeleteRecord.create(record.getKey(), 
orderingVal), position));
     }
     numberOfRecords++;
   }
@@ -652,17 +658,19 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
                                          HoodieLogBlock.HoodieLogBlockType 
logDataBlockFormat,
                                          List<HoodieRecord> records,
+                                         boolean shouldWriteRecordPositions,
                                          Map<HeaderMetadataType, String> 
header,
                                          String keyField) {
     switch (logDataBlockFormat) {
       case AVRO_DATA_BLOCK:
-        return new HoodieAvroDataBlock(records, header, keyField);
+        return new HoodieAvroDataBlock(records, shouldWriteRecordPositions, 
header, keyField);
       case HFILE_DATA_BLOCK:
         return new HoodieHFileDataBlock(
             records, header, writeConfig.getHFileCompressionAlgorithm(), new 
Path(writeConfig.getBasePath()));
       case PARQUET_DATA_BLOCK:
         return new HoodieParquetDataBlock(
             records,
+            shouldWriteRecordPositions,
             header,
             keyField,
             writeConfig.getParquetCompressionCodec(),
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index ae643b80cbc..135e4866cc5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -51,26 +51,28 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> 
extends HoodieReadHandle<T
     this.keyGeneratorOpt = keyGeneratorOpt;
   }
 
-  private List<HoodieKey> fetchHoodieKeys(HoodieBaseFile baseFile) {
+  private List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
     BaseFileUtils baseFileUtils = 
BaseFileUtils.getInstance(baseFile.getPath());
     if (keyGeneratorOpt.isPresent()) {
-      return baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new 
Path(baseFile.getPath()), keyGeneratorOpt);
+      return 
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getHadoopConf(), new 
Path(baseFile.getPath()), keyGeneratorOpt);
     } else {
-      return baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new 
Path(baseFile.getPath()));
+      return 
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getHadoopConf(), new 
Path(baseFile.getPath()));
     }
   }
 
   public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
     HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
-    return fetchHoodieKeys(baseFile).stream()
-        .map(entry -> Pair.of(entry,
-            new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId())));
+    return fetchRecordKeysWithPositions(baseFile).stream()
+        .map(entry -> Pair.of(entry.getLeft(),
+            new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId(), entry.getRight())));
   }
 
   public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
     HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
-    return fetchHoodieKeys(baseFile).stream()
-        .map(entry -> Pair.of(entry.getRecordKey(),
-            new HoodieRecordGlobalLocation(entry.getPartitionPath(), 
baseFile.getCommitTime(), baseFile.getFileId())));
+    return fetchRecordKeysWithPositions(baseFile).stream()
+        .map(entry -> Pair.of(entry.getLeft().getRecordKey(),
+            new HoodieRecordGlobalLocation(
+                entry.getLeft().getPartitionPath(), baseFile.getCommitTime(),
+                baseFile.getFileId(), entry.getRight())));
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index 9590e8fcc2e..db3be1c1842 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -101,12 +101,12 @@ public class HoodieKeyLookupHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K,
     }
 
     HoodieBaseFile baseFile = getLatestBaseFile();
-    List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new 
Path(baseFile.getPath()), candidateRecordKeys,
-        hoodieTable.getHadoopConf());
+    List<Pair<String, Long>> matchingKeysAndPositions = 
HoodieIndexUtils.filterKeysFromFile(
+        new Path(baseFile.getPath()), candidateRecordKeys, 
hoodieTable.getHadoopConf());
     LOG.info(
         String.format("Total records (%d), bloom filter candidates 
(%d)/fp(%d), actual matches (%d)", totalKeysChecked,
-            candidateRecordKeys.size(), candidateRecordKeys.size() - 
matchingKeys.size(), matchingKeys.size()));
+            candidateRecordKeys.size(), candidateRecordKeys.size() - 
matchingKeysAndPositions.size(), matchingKeysAndPositions.size()));
     return new HoodieKeyLookupResult(partitionPathFileIDPair.getRight(), 
partitionPathFileIDPair.getLeft(),
-        baseFile.getCommitTime(), matchingKeys);
+        baseFile.getCommitTime(), matchingKeysAndPositions);
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
index 19096a21d87..b64e57e0f03 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.common.util.collection.Pair;
+
 import java.util.List;
 
 /**
@@ -27,15 +29,15 @@ public class HoodieKeyLookupResult {
 
   private final String fileId;
   private final String baseInstantTime;
-  private final List<String> matchingRecordKeys;
+  private final List<Pair<String, Long>> matchingRecordKeysAndPositions;
   private final String partitionPath;
 
   public HoodieKeyLookupResult(String fileId, String partitionPath, String 
baseInstantTime,
-                               List<String> matchingRecordKeys) {
+                               List<Pair<String, Long>> 
matchingRecordKeysAndPositions) {
     this.fileId = fileId;
     this.partitionPath = partitionPath;
     this.baseInstantTime = baseInstantTime;
-    this.matchingRecordKeys = matchingRecordKeys;
+    this.matchingRecordKeysAndPositions = matchingRecordKeysAndPositions;
   }
 
   public String getFileId() {
@@ -50,8 +52,8 @@ public class HoodieKeyLookupResult {
     return partitionPath;
   }
 
-  public List<String> getMatchingRecordKeys() {
-    return matchingRecordKeys;
+  public List<Pair<String, Long>> getMatchingRecordKeysAndPositions() {
+    return matchingRecordKeysAndPositions;
   }
 }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 6a49daf817d..0c1bb81f01e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -33,7 +33,6 @@ import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -709,7 +708,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     engineContext.foreach(fileGroupFileIds, fileGroupFileId -> {
       try {
         final Map<HeaderMetadataType, String> blockHeader = 
Collections.singletonMap(HeaderMetadataType.INSTANT_TIME, instantTime);
-        final HoodieDeleteBlock block = new HoodieDeleteBlock(new 
DeleteRecord[0], blockHeader);
+
+        final HoodieDeleteBlock block = new 
HoodieDeleteBlock(Collections.emptyList(), false, blockHeader);
 
         HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
             
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), 
metadataPartition.getPartitionPath()))
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
index e104bef23fb..0423d29a5f4 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
@@ -134,7 +134,7 @@ public class TestLegacyArchivedMetaEntryReader {
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
wrapperSchema.toString());
       final String keyField = 
metaClient.getTableConfig().getRecordKeyFieldProp();
       List<HoodieRecord> indexRecords = 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
-      HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, 
header, keyField);
+      HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, false, 
header, keyField);
       writer.appendBlock(block);
       records.clear();
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index af5d3e9a68d..bb87bc24e78 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -183,7 +183,8 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
           LOG.warn("Failed to convert record " + r.toString(), e);
           return null;
         }
-      }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
+          false, header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
       return Pair.of(partitionPath, logWriter.getLogFile());
     }
   }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index d4b4007bedb..f9a6c0d5ea1 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -246,14 +246,13 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
     List<String> uuids = asList(record1.getRecordKey(), 
record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
 
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
-    HoodieFlinkTable table = HoodieFlinkTable.create(config, context, 
metaClient);
-    List<String> results = HoodieIndexUtils.filterKeysFromFile(
+    List<Pair<String, Long>> results = HoodieIndexUtils.filterKeysFromFile(
         new Path(java.nio.file.Paths.get(basePath, partition, 
filename).toString()), uuids, hadoopConf);
     assertEquals(results.size(), 2);
-    assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
-        || results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
-    assertTrue(results.get(0).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
-        || results.get(1).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
+    
assertTrue(results.get(0).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
+        || 
results.get(1).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
+    
assertTrue(results.get(0).getLeft().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
+        || 
results.get(1).getLeft().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
     // TODO(vc): Need more coverage on actual filenames
     // assertTrue(results.get(0)._2().equals(filename));
     // assertTrue(results.get(1)._2().equals(filename));
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index e9c5b6f6f5b..9ab96369d8e 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -145,7 +145,7 @@ public class HoodieFlinkWriteableTestTable extends 
HoodieWriteableTestTable {
           LOG.warn("Failed to convert record " + r.toString(), e);
           return null;
         }
-      }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), 
false, header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
       return Pair.of(partitionPath, logWriter.getLogFile());
     }
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
index 2b6a96b3d05..403eb35926c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -127,14 +127,15 @@ public class HoodieFileProbingFunction implements
             // TODO add assertion that file is checked only once
 
             final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
-            List<String> matchingKeys = 
HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()),
-                candidateRecordKeys, hadoopConf.get());
+            List<Pair<String, Long>> matchingKeysAndPositions = 
HoodieIndexUtils.filterKeysFromFile(
+                new Path(dataFile.getPath()), candidateRecordKeys, 
hadoopConf.get());
 
             LOG.debug(
                 String.format("Bloom filter candidates (%d) / false positives 
(%d), actual matches (%d)",
-                    candidateRecordKeys.size(), candidateRecordKeys.size() - 
matchingKeys.size(), matchingKeys.size()));
+                    candidateRecordKeys.size(), candidateRecordKeys.size() - 
matchingKeysAndPositions.size(),
+                    matchingKeysAndPositions.size()));
 
-            return new HoodieKeyLookupResult(fileId, partitionPath, 
dataFile.getCommitTime(), matchingKeys);
+            return new HoodieKeyLookupResult(fileId, partitionPath, 
dataFile.getCommitTime(), matchingKeysAndPositions);
           })
           .collect(Collectors.toList());
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index 37ce8740af5..cd5f14d8fdf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -173,10 +173,12 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
     }
 
     return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator)
-        .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
-        .flatMapToPair(lookupResult -> 
lookupResult.getMatchingRecordKeys().stream()
-            .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, 
lookupResult.getPartitionPath()),
-                new HoodieRecordLocation(lookupResult.getBaseInstantTime(), 
lookupResult.getFileId())))
+        .filter(lr -> lr.getMatchingRecordKeysAndPositions().size() > 0)
+        .flatMapToPair(lookupResult -> 
lookupResult.getMatchingRecordKeysAndPositions().stream()
+            .map(recordKeyAndPosition -> new Tuple2<>(
+                new HoodieKey(recordKeyAndPosition.getLeft(), 
lookupResult.getPartitionPath()),
+                new HoodieRecordLocation(lookupResult.getBaseInstantTime(), 
lookupResult.getFileId(),
+                    recordKeyAndPosition.getRight())))
             .collect(Collectors.toList()).iterator()));
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 2a22eacea8c..046fa98fbea 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -18,22 +18,23 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.util.BaseFileUtils;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.ParquetReaderIterator;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.schema.MessageType;
@@ -80,7 +81,7 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
   }
 
   @Override
-  public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
+  public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
     return parquetUtils.filterRowKeys(conf, path, candidateRowKeys);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 34e144dcb82..9edf1018582 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -330,15 +330,14 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
         Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), 
record3.getRecordKey(), record4.getRecordKey());
 
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
-    HoodieSparkTable table = HoodieSparkTable.create(config, context, 
metaClient);
-    List<String> results = HoodieIndexUtils.filterKeysFromFile(
+    List<Pair<String, Long>> results = HoodieIndexUtils.filterKeysFromFile(
         new Path(Paths.get(basePath, partition, filename).toString()), uuids, 
hadoopConf);
 
     assertEquals(results.size(), 2);
-    assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
-        || results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
-    assertTrue(results.get(0).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
-        || results.get(1).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
+    
assertTrue(results.get(0).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
+        || 
results.get(1).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
+    
assertTrue(results.get(0).getLeft().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
+        || 
results.get(1).getLeft().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
     // TODO(vc): Need more coverage on actual filenames
     // assertTrue(results.get(0)._2().equals(filename));
     // assertTrue(results.get(1)._2().equals(filename));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index 3e2620c1e4b..1acfe309a2f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -147,8 +147,10 @@ public class TestHoodieKeyLocationFetchHandle extends 
HoodieSparkClientTestHarne
         String fileId = 
testTable.addCommit(instantTime).getFileIdWithInserts(entry.getKey(), 
recordsPerSlice.toArray(new HoodieRecord[0]));
         Tuple2<String, String> fileIdInstantTimePair = new Tuple2<>(fileId, 
instantTime);
         List<Tuple2<HoodieKey, HoodieRecordLocation>> expectedEntries = new 
ArrayList<>();
+        // record position
+        long position = 0;
         for (HoodieRecord record : recordsPerSlice) {
-          expectedEntries.add(new Tuple2<>(record.getKey(), new 
HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1)));
+          expectedEntries.add(new Tuple2<>(record.getKey(), new 
HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1, 
position++)));
         }
         expectedList.put(new Tuple2<>(entry.getKey(), 
fileIdInstantTimePair._1), expectedEntries);
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 2a519d1334b..827c143d877 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -18,16 +18,17 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoSerializable;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.keygen.BaseKeyGenerator;
 
 import javax.annotation.Nullable;
 
@@ -248,6 +249,13 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     return this.currentLocation != null;
   }
 
+  public long getCurrentPosition() {
+    if (isCurrentLocationKnown()) {
+      return this.currentLocation.getPosition();
+    }
+    return HoodieRecordLocation.INVALID_POSITION;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
index 4121a334548..ed0e4cbe905 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
@@ -40,12 +40,18 @@ public final class HoodieRecordGlobalLocation extends 
HoodieRecordLocation {
     this.partitionPath = partitionPath;
   }
 
+  public HoodieRecordGlobalLocation(String partitionPath, String instantTime, 
String fileId, long position) {
+    super(instantTime, fileId, position);
+    this.partitionPath = partitionPath;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("HoodieGlobalRecordLocation {");
     sb.append("partitionPath=").append(partitionPath).append(", ");
     sb.append("instantTime=").append(instantTime).append(", ");
-    sb.append("fileId=").append(fileId);
+    sb.append("fileId=").append(fileId).append(", ");
+    sb.append("position=").append(position);
     sb.append('}');
     return sb.toString();
   }
@@ -61,7 +67,8 @@ public final class HoodieRecordGlobalLocation extends 
HoodieRecordLocation {
     HoodieRecordGlobalLocation otherLoc = (HoodieRecordGlobalLocation) o;
     return Objects.equals(partitionPath, otherLoc.partitionPath)
         && Objects.equals(instantTime, otherLoc.instantTime)
-        && Objects.equals(fileId, otherLoc.fileId);
+        && Objects.equals(fileId, otherLoc.fileId)
+        && Objects.equals(position, otherLoc.position);
   }
 
   @Override
@@ -88,14 +95,14 @@ public final class HoodieRecordGlobalLocation extends 
HoodieRecordLocation {
    * Returns the record location as local.
    */
   public HoodieRecordLocation toLocal(String instantTime) {
-    return new HoodieRecordLocation(instantTime, fileId);
+    return new HoodieRecordLocation(instantTime, fileId, position);
   }
 
   /**
    * Copy the location with given partition path.
    */
   public HoodieRecordGlobalLocation copy(String partitionPath) {
-    return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId);
+    return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId, 
position);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
index 8b1dd2b378a..15f72918096 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
@@ -30,16 +30,26 @@ import java.util.Objects;
  * Location of a HoodieRecord within the partition it belongs to. Ultimately, 
this points to an actual file on disk
  */
 public class HoodieRecordLocation implements Serializable, KryoSerializable {
+  public static final long INVALID_POSITION = -1L;
 
   protected String instantTime;
   protected String fileId;
+  // Position of the record in the file, e.g., row position starting from 0 in 
the Parquet file
+  // Valid position should be non-negative. Negative position, i.e., -1, means 
it's invalid
+  // and should not be used
+  protected long position;
 
   public HoodieRecordLocation() {
   }
 
   public HoodieRecordLocation(String instantTime, String fileId) {
+    this(instantTime, fileId, INVALID_POSITION);
+  }
+
+  public HoodieRecordLocation(String instantTime, String fileId, long 
position) {
     this.instantTime = instantTime;
     this.fileId = fileId;
+    this.position = position;
   }
 
   @Override
@@ -51,19 +61,21 @@ public class HoodieRecordLocation implements Serializable, 
KryoSerializable {
       return false;
     }
     HoodieRecordLocation otherLoc = (HoodieRecordLocation) o;
-    return Objects.equals(instantTime, otherLoc.instantTime) && 
Objects.equals(fileId, otherLoc.fileId);
+    return Objects.equals(instantTime, otherLoc.instantTime) && 
Objects.equals(fileId, otherLoc.fileId)
+        && Objects.equals(position, otherLoc.position);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(instantTime, fileId);
+    return Objects.hash(instantTime, fileId, position);
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("HoodieRecordLocation {");
     sb.append("instantTime=").append(instantTime).append(", ");
-    sb.append("fileId=").append(fileId);
+    sb.append("fileId=").append(fileId).append(", ");
+    sb.append("position=").append(position);
     sb.append('}');
     return sb.toString();
   }
@@ -84,15 +96,29 @@ public class HoodieRecordLocation implements Serializable, 
KryoSerializable {
     this.fileId = fileId;
   }
 
+  public static boolean isPositionValid(long position) {
+    return position > INVALID_POSITION;
+  }
+
+  public long getPosition() {
+    return position;
+  }
+
+  public void setPosition(long position) {
+    this.position = position;
+  }
+
   @Override
   public void write(Kryo kryo, Output output) {
     output.writeString(instantTime);
     output.writeString(fileId);
+    output.writeLong(position);
   }
 
   @Override
   public void read(Kryo kryo, Input input) {
     this.instantTime = input.readString();
     this.fileId = input.readString();
+    this.position = input.readLong();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index 0b1a1d5c84d..778a2d45cfd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -42,6 +42,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -102,7 +103,7 @@ public class LogReaderUtils {
    * generated from serializing {@link Roaring64NavigableMap} bitmap using the 
portable format.
    * @throws IOException upon I/O error.
    */
-  public static String encodePositions(List<Long> positions) throws 
IOException {
+  public static String encodePositions(Set<Long> positions) throws IOException 
{
     Roaring64NavigableMap positionBitmap = new Roaring64NavigableMap();
     positions.forEach(positionBitmap::add);
     return encodePositions(positionBitmap);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index c2e0aa50364..ca568c0f38c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -84,10 +84,11 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
   }
 
   public HoodieAvroDataBlock(@Nonnull List<HoodieRecord> records,
-      @Nonnull Map<HeaderMetadataType, String> header,
-      @Nonnull String keyField
+                             boolean shouldWriteRecordPositions,
+                             @Nonnull Map<HeaderMetadataType, String> header,
+                             @Nonnull String keyField
   ) {
-    super(records, header, new HashMap<>(), keyField);
+    super(records, shouldWriteRecordPositions, header, new HashMap<>(), 
keyField);
   }
 
   @Override
@@ -225,7 +226,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
    */
   @Deprecated
   public HoodieAvroDataBlock(List<HoodieRecord> records, Schema schema) {
-    super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA, 
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    super(records, false, Collections.singletonMap(HeaderMetadataType.SCHEMA, 
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
   }
 
   public static HoodieAvroDataBlock getBlock(byte[] content, Schema 
readerSchema) throws IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index 93bd41b88d0..f74a02e2d0b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -48,7 +48,7 @@ public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
   public HoodieCDCDataBlock(List<HoodieRecord> records,
                             Map<HeaderMetadataType, String> header,
                             String keyField) {
-    super(records, header, keyField);
+    super(records, false, header, keyField);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 7415c0931ee..2b3dfaf6a61 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -27,6 +27,8 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -38,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
@@ -51,6 +54,7 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  *   3. Actual serialized content of the records
  */
 public abstract class HoodieDataBlock extends HoodieLogBlock {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieDataBlock.class);
 
   // TODO rebase records/content to leverage Either to warrant
   //      that they are mutex (used by read/write flows respectively)
@@ -64,6 +68,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
   private final boolean enablePointLookups;
 
   protected Schema readerSchema;
+  protected final boolean shouldWriteRecordPositions;
 
   //  Map of string schema to parsed schema.
   private static ConcurrentHashMap<String, Schema> schemaMap = new 
ConcurrentHashMap<>();
@@ -72,14 +77,31 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
    * NOTE: This ctor is used on the write-path (ie when records ought to be 
written into the log)
    */
   public HoodieDataBlock(List<HoodieRecord> records,
+                         boolean shouldWriteRecordPositions,
                          Map<HeaderMetadataType, String> header,
                          Map<HeaderMetadataType, String> footer,
                          String keyFieldName) {
     super(header, footer, Option.empty(), Option.empty(), null, false);
+    if (shouldWriteRecordPositions) {
+      records.sort((o1, o2) -> {
+        long v1 = o1.getCurrentPosition();
+        long v2 = o2.getCurrentPosition();
+        return Long.compare(v1, v2);
+      });
+      if (isPositionValid(records.get(0).getCurrentPosition())) {
+        addRecordPositionsToHeader(
+            
records.stream().map(HoodieRecord::getCurrentPosition).collect(Collectors.toSet()),
+            records.size());
+      } else {
+        LOG.warn("There are records without valid positions. "
+            + "Skip writing record positions to the data block header.");
+      }
+    }
     this.records = Option.of(records);
     this.keyFieldName = keyFieldName;
     // If no reader-schema has been provided assume writer-schema as one
     this.readerSchema = getWriterSchema(super.getLogBlockHeader());
+    this.shouldWriteRecordPositions = shouldWriteRecordPositions;
     this.enablePointLookups = false;
   }
 
@@ -96,6 +118,8 @@ public abstract class HoodieDataBlock extends HoodieLogBlock 
{
                             String keyFieldName,
                             boolean enablePointLookups) {
     super(headers, footer, blockContentLocation, content, inputStream, 
readBlockLazily);
+    // Setting `shouldWriteRecordPositions` to false as this constructor is 
only used by the reader
+    this.shouldWriteRecordPositions = false;
     this.records = Option.empty();
     this.keyFieldName = keyFieldName;
     // If no reader-schema has been provided assume writer-schema as one
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index 23ce76c5ef4..23c42b964d8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SerializationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.util.Lazy;
 
@@ -37,6 +38,8 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -51,11 +54,13 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
+import static 
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
 
 /**
  * Delete block contains a list of keys to be deleted from scanning the blocks 
so far.
  */
 public class HoodieDeleteBlock extends HoodieLogBlock {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieDeleteBlock.class);
   /**
    * These static builders are added to avoid performance issue in Avro 1.10.
    * You can find more details in HoodieAvroUtils, HUDI-3834, and AVRO-3048.
@@ -65,17 +70,44 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
   private static final Lazy<HoodieDeleteRecord.Builder> 
HOODIE_DELETE_RECORD_BUILDER_STUB =
       Lazy.lazily(HoodieDeleteRecord::newBuilder);
 
+  private final boolean shouldWriteRecordPositions;
+  // Records to delete, sorted based on the record position if writing record 
position to the log block header
   private DeleteRecord[] recordsToDelete;
 
-  public HoodieDeleteBlock(DeleteRecord[] recordsToDelete, 
Map<HeaderMetadataType, String> header) {
-    this(Option.empty(), null, false, Option.empty(), header, new HashMap<>());
-    this.recordsToDelete = recordsToDelete;
+  public HoodieDeleteBlock(List<Pair<DeleteRecord, Long>> recordsToDelete,
+                           boolean shouldWriteRecordPositions,
+                           Map<HeaderMetadataType, String> header) {
+    this(Option.empty(), null, false, Option.empty(), header, new HashMap<>(), 
shouldWriteRecordPositions);
+    if (shouldWriteRecordPositions && !recordsToDelete.isEmpty()) {
+      recordsToDelete.sort((o1, o2) -> {
+        long v1 = o1.getRight();
+        long v2 = o2.getRight();
+        return Long.compare(v1, v2);
+      });
+      if (isPositionValid(recordsToDelete.get(0).getRight())) {
+        addRecordPositionsToHeader(
+            
recordsToDelete.stream().map(Pair::getRight).collect(Collectors.toSet()),
+            recordsToDelete.size());
+      } else {
+        LOG.warn("There are delete records without valid positions. "
+            + "Skip writing record positions to the delete block header.");
+      }
+    }
+    this.recordsToDelete = 
recordsToDelete.stream().map(Pair::getLeft).toArray(DeleteRecord[]::new);
   }
 
   public HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream 
inputStream, boolean readBlockLazily,
                            Option<HoodieLogBlockContentLocation> 
blockContentLocation, Map<HeaderMetadataType, String> header,
                            Map<HeaderMetadataType, String> footer) {
+    // Setting `shouldWriteRecordPositions` to false as this constructor is 
only used by the reader
+    this(content, inputStream, readBlockLazily, blockContentLocation, header, 
footer, false);
+  }
+
+  HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream inputStream, 
boolean readBlockLazily,
+                    Option<HoodieLogBlockContentLocation> 
blockContentLocation, Map<HeaderMetadataType, String> header,
+                    Map<HeaderMetadataType, String> footer, boolean 
shouldWriteRecordPositions) {
     super(header, footer, blockContentLocation, content, inputStream, 
readBlockLazily);
+    this.shouldWriteRecordPositions = shouldWriteRecordPositions;
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 3f213881b8d..d00d864c112 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -93,7 +93,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
                               Map<HeaderMetadataType, String> header,
                               Compression.Algorithm compressionAlgorithm,
                               Path pathForReader) {
-    super(records, header, new HashMap<>(), 
HoodieAvroHFileReader.KEY_FIELD_NAME);
+    super(records, false, header, new HashMap<>(), 
HoodieAvroHFileReader.KEY_FIELD_NAME);
     this.compressionAlgorithm = Option.of(compressionAlgorithm);
     this.pathForReader = pathForReader;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 106c188f0f1..443bcab9b8e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -29,6 +29,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -40,6 +42,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -48,7 +51,7 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  * Abstract class defining a block in HoodieLogFile.
  */
 public abstract class HoodieLogBlock {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogBlock.class);
   /**
    * The current version of the log block. Anytime the logBlock format changes 
this version needs to be bumped and
    * corresponding changes need to be made to {@link HoodieLogBlockVersion} 
TODO : Change this to a class, something
@@ -138,6 +141,20 @@ public abstract class HoodieLogBlock {
     return 
LogReaderUtils.decodeRecordPositionsHeader(logBlockHeader.get(HeaderMetadataType.RECORD_POSITIONS));
   }
 
+  protected void addRecordPositionsToHeader(Set<Long> positionSet, int 
numRecords) {
+    if (positionSet.size() == numRecords) {
+      try {
+        logBlockHeader.put(HeaderMetadataType.RECORD_POSITIONS, 
LogReaderUtils.encodePositions(positionSet));
+      } catch (IOException e) {
+        LOG.error("Cannot write record positions to the log block header.", e);
+      }
+    } else {
+      LOG.warn("There are duplicate keys in the records (number of unique 
positions: %s, "
+              + "number of records: %s). Skip writing record positions to the 
log block header.",
+          positionSet.size(), numRecords);
+    }
+  }
+
   /**
    * Type of the log block WARNING: This enum is serialized as the ordinal. 
Only add new enums at the end.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 796d7454cc5..c1ac0aba1d3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -78,13 +78,14 @@ public class HoodieParquetDataBlock extends HoodieDataBlock 
{
   }
 
   public HoodieParquetDataBlock(List<HoodieRecord> records,
+                                boolean shouldWriteRecordPositions,
                                 Map<HeaderMetadataType, String> header,
                                 String keyField,
                                 CompressionCodecName compressionCodecName,
                                 double expectedCompressionRatio,
                                 boolean useDictionaryEncoding
   ) {
-    super(records, header, new HashMap<>(), keyField);
+    super(records, shouldWriteRecordPositions, header, new HashMap<>(), 
keyField);
 
     this.compressionCodecName = Option.of(compressionCodecName);
     this.expectedCompressionRatio = Option.of(expectedCompressionRatio);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index d402f58a40a..be41857a38e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
@@ -39,6 +40,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Utils for Hudi base file.
@@ -69,12 +71,14 @@ public abstract class BaseFileUtils {
 
   /**
    * Read the rowKey list from the given data file.
+   *
    * @param filePath      The data file path
    * @param configuration configuration to build fs object
    * @return Set Set of row keys
    */
   public Set<String> readRowKeys(Configuration configuration, Path filePath) {
-    return filterRowKeys(configuration, filePath, new HashSet<>());
+    return filterRowKeys(configuration, filePath, new HashSet<>())
+        .stream().map(Pair::getKey).collect(Collectors.toSet());
   }
 
   /**
@@ -162,21 +166,23 @@ public abstract class BaseFileUtils {
 
   /**
    * Read the rowKey list matching the given filter, from the given data file.
-   * If the filter is empty, then this will return all the row keys.
+   * If the filter is empty, then this will return all the row keys and 
corresponding positions.
+   *
    * @param filePath      The data file path
    * @param configuration configuration to build fs object
    * @param filter        record keys filter
-   * @return Set Set of row keys matching candidateRecordKeys
+   * @return Set Set of pairs of row key and position matching 
candidateRecordKeys
    */
-  public abstract Set<String> filterRowKeys(Configuration configuration, Path 
filePath, Set<String> filter);
+  public abstract Set<Pair<String, Long>> filterRowKeys(Configuration 
configuration, Path filePath, Set<String> filter);
 
   /**
-   * Fetch {@link HoodieKey}s from the given data file.
+   * Fetch {@link HoodieKey}s with positions from the given data file.
+   *
    * @param configuration configuration to build fs object
    * @param filePath      The data file path
-   * @return {@link List} of {@link HoodieKey}s fetched from the data file
+   * @return {@link List} of pairs of {@link HoodieKey} and position fetched 
from the data file
    */
-  public abstract List<HoodieKey> fetchHoodieKeys(Configuration configuration, 
Path filePath);
+  public abstract List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(Configuration configuration, Path filePath);
 
   /**
    * Provides a closable iterator for reading the given data file.
@@ -196,13 +202,14 @@ public abstract class BaseFileUtils {
   public abstract ClosableIterator<HoodieKey> 
getHoodieKeyIterator(Configuration configuration, Path filePath);
 
   /**
-   * Fetch {@link HoodieKey}s from the given data file.
-   * @param configuration configuration to build fs object
-   * @param filePath      The data file path
+   * Fetch {@link HoodieKey}s with positions from the given data file.
+   *
+   * @param configuration   configuration to build fs object
+   * @param filePath        The data file path
    * @param keyGeneratorOpt instance of KeyGenerator.
-   * @return {@link List} of {@link HoodieKey}s fetched from the data file
+   * @return {@link List} of pairs of {@link HoodieKey} and position fetched 
from the data file
    */
-  public abstract List<HoodieKey> fetchHoodieKeys(Configuration configuration, 
Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
+  public abstract List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(Configuration configuration, Path filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt);
 
   /**
    * Read the Avro schema of the data file.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index dfbb80cfb63..50fa80213c3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.MetadataNotFoundException;
@@ -107,7 +108,7 @@ public class OrcUtils extends BaseFileUtils {
    * @return {@link List} of {@link HoodieKey}s fetched from the ORC file
    */
   @Override
-  public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path 
filePath) {
+  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(Configuration configuration, Path filePath) {
     try {
       if (!filePath.getFileSystem(configuration).exists(filePath)) {
         return Collections.emptyList();
@@ -115,15 +116,19 @@ public class OrcUtils extends BaseFileUtils {
     } catch (IOException e) {
       throw new HoodieIOException("Failed to read from ORC file:" + filePath, 
e);
     }
-    List<HoodieKey> hoodieKeys = new ArrayList<>();
-    try (ClosableIterator<HoodieKey> iterator = 
getHoodieKeyIterator(configuration, filePath, Option.empty()))  {
-      iterator.forEachRemaining(hoodieKeys::add);
+    List<Pair<HoodieKey, Long>> hoodieKeysAndPositions = new ArrayList<>();
+    long position = 0;
+    try (ClosableIterator<HoodieKey> iterator = 
getHoodieKeyIterator(configuration, filePath, Option.empty())) {
+      while (iterator.hasNext()) {
+        hoodieKeysAndPositions.add(Pair.of(iterator.next(), position));
+        position++;
+      }
     }
-    return hoodieKeys;
+    return hoodieKeysAndPositions;
   }
 
   @Override
-  public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path 
filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
+  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(Configuration configuration, Path filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
     throw new UnsupportedOperationException("Custom key generator is not 
supported yet");
   }
 
@@ -171,18 +176,19 @@ public class OrcUtils extends BaseFileUtils {
    * Read the rowKey list matching the given filter, from the given ORC file. 
If the filter is empty, then this will
    * return all the rowkeys.
    *
-   * @param conf configuration to build fs object.
-   * @param filePath      The ORC file path.
-   * @param filter        record keys filter
-   * @return Set Set of row keys matching candidateRecordKeys
+   * @param conf     configuration to build fs object.
+   * @param filePath The ORC file path.
+   * @param filter   record keys filter
+   * @return Set Set of pairs of row key and position matching 
candidateRecordKeys
    */
   @Override
-  public Set<String> filterRowKeys(Configuration conf, Path filePath, 
Set<String> filter)
+  public Set<Pair<String, Long>> filterRowKeys(Configuration conf, Path 
filePath, Set<String> filter)
       throws HoodieIOException {
+    long rowPosition = 0;
     try (Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf));) {
       TypeDescription schema = reader.getSchema();
       try (RecordReader recordReader = reader.rows(new 
Options(conf).schema(schema))) {
-        Set<String> filteredRowKeys = new HashSet<>();
+        Set<Pair<String, Long>> filteredRowKeys = new HashSet<>();
         List<String> fieldNames = schema.getFieldNames();
         VectorizedRowBatch batch = schema.createRowBatch();
 
@@ -202,8 +208,9 @@ public class OrcUtils extends BaseFileUtils {
           for (int i = 0; i < batch.size; i++) {
             String rowKey = rowKeys.toString(i);
             if (filter.isEmpty() || filter.contains(rowKey)) {
-              filteredRowKeys.add(rowKey);
+              filteredRowKeys.add(Pair.of(rowKey, rowPosition));
             }
+            rowPosition++;
           }
         }
         return filteredRowKeys;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index de5572523c1..2decdd3fc5e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.MetadataNotFoundException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
@@ -74,15 +75,15 @@ public class ParquetUtils extends BaseFileUtils {
 
   /**
    * Read the rowKey list matching the given filter, from the given parquet 
file. If the filter is empty, then this will
-   * return all the rowkeys.
+   * return all the rowkeys and corresponding positions.
    *
    * @param filePath      The parquet file path.
    * @param configuration configuration to build fs object
    * @param filter        record keys filter
-   * @return Set Set of row keys matching candidateRecordKeys
+   * @return Set Set of pairs of row key and position matching 
candidateRecordKeys
    */
   @Override
-  public Set<String> filterRowKeys(Configuration configuration, Path filePath, 
Set<String> filter) {
+  public Set<Pair<String, Long>> filterRowKeys(Configuration configuration, 
Path filePath, Set<String> filter) {
     return filterParquetRowKeys(configuration, filePath, filter, 
HoodieAvroUtils.getRecordKeySchema());
   }
 
@@ -105,10 +106,10 @@ public class ParquetUtils extends BaseFileUtils {
    * @param configuration configuration to build fs object
    * @param filter        record keys filter
    * @param readSchema    schema of columns to be read
-   * @return Set Set of row keys matching candidateRecordKeys
+   * @return Set Set of pairs of row key and position matching 
candidateRecordKeys
    */
-  private static Set<String> filterParquetRowKeys(Configuration configuration, 
Path filePath, Set<String> filter,
-                                                  Schema readSchema) {
+  private static Set<Pair<String, Long>> filterParquetRowKeys(Configuration 
configuration, Path filePath, Set<String> filter,
+                                                              Schema 
readSchema) {
     Option<RecordKeysFilterFunction> filterFunction = Option.empty();
     if (filter != null && !filter.isEmpty()) {
       filterFunction = Option.of(new RecordKeysFilterFunction(filter));
@@ -117,17 +118,19 @@ public class ParquetUtils extends BaseFileUtils {
     conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
     AvroReadSupport.setAvroReadSchema(conf, readSchema);
     AvroReadSupport.setRequestedProjection(conf, readSchema);
-    Set<String> rowKeys = new HashSet<>();
+    Set<Pair<String, Long>> rowKeys = new HashSet<>();
+    long rowPosition = 0;
     try (ParquetReader reader = 
AvroParquetReader.builder(filePath).withConf(conf).build()) {
       Object obj = reader.read();
       while (obj != null) {
         if (obj instanceof GenericRecord) {
           String recordKey = ((GenericRecord) 
obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
           if (!filterFunction.isPresent() || 
filterFunction.get().apply(recordKey)) {
-            rowKeys.add(recordKey);
+            rowKeys.add(Pair.of(recordKey, rowPosition));
           }
+          obj = reader.read();
+          rowPosition++;
         }
-        obj = reader.read();
       }
     } catch (IOException e) {
       throw new HoodieIOException("Failed to read row keys from Parquet " + 
filePath, e);
@@ -138,15 +141,15 @@ public class ParquetUtils extends BaseFileUtils {
   }
 
   /**
-   * Fetch {@link HoodieKey}s from the given parquet file.
+   * Fetch {@link HoodieKey}s with row positions from the given parquet file.
    *
    * @param filePath      The parquet file path.
    * @param configuration configuration to build fs object
-   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   * @return {@link List} of pairs of {@link HoodieKey} and row position 
fetched from the parquet file
    */
   @Override
-  public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path 
filePath) {
-    return fetchHoodieKeys(configuration, filePath, Option.empty());
+  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(Configuration configuration, Path filePath) {
+    return fetchRecordKeysWithPositions(configuration, filePath, 
Option.empty());
   }
 
   @Override
@@ -185,19 +188,23 @@ public class ParquetUtils extends BaseFileUtils {
   }
 
   /**
-   * Fetch {@link HoodieKey}s from the given parquet file.
+   * Fetch {@link HoodieKey}s with row positions from the given parquet file.
    *
    * @param configuration   configuration to build fs object
    * @param filePath        The parquet file path.
    * @param keyGeneratorOpt instance of KeyGenerator.
-   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   * @return {@link List} of pairs of {@link HoodieKey} and row position 
fetched from the parquet file
    */
   @Override
-  public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path 
filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
-    List<HoodieKey> hoodieKeys = new ArrayList<>();
+  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(Configuration configuration, Path filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
+    List<Pair<HoodieKey, Long>> hoodieKeysAndPositions = new ArrayList<>();
+    long position = 0;
     try (ClosableIterator<HoodieKey> iterator = 
getHoodieKeyIterator(configuration, filePath, keyGeneratorOpt)) {
-      iterator.forEachRemaining(hoodieKeys::add);
-      return hoodieKeys;
+      while (iterator.hasNext()) {
+        hoodieKeysAndPositions.add(Pair.of(iterator.next(), position));
+        position++;
+      }
+      return hoodieKeysAndPositions;
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
index b1ff386f120..af08cc85ef5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
@@ -24,10 +24,12 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -58,13 +60,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.CollectionUtils.toStream;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * NOTE: PLEASE READ DOCS & COMMENTS CAREFULLY BEFORE MAKING CHANGES
@@ -190,9 +192,9 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
    * @return Subset of candidate keys that are available
    */
   @Override
-  public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
-    checkState(candidateRowKeys instanceof TreeSet,
-        String.format("HFile reader expects a TreeSet as iterating over 
ordered keys is more performant, got (%s)", 
candidateRowKeys.getClass().getSimpleName()));
+  public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
+    // candidateRowKeys must be sorted
+    SortedSet<String> sortedCandidateRowKeys = new TreeSet<>(candidateRowKeys);
 
     synchronized (sharedLock) {
       if (!sharedScanner.isPresent()) {
@@ -200,14 +202,18 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
         // by default, to minimize amount of traffic to the underlying storage
         sharedScanner = Option.of(getHFileScanner(getSharedHFileReader(), 
true));
       }
-      return candidateRowKeys.stream().filter(k -> {
-        try {
-          return isKeyAvailable(k, sharedScanner.get());
-        } catch (IOException e) {
-          LOG.error("Failed to check key availability: " + k);
-          return false;
-        }
-      }).collect(Collectors.toSet());
+      return sortedCandidateRowKeys.stream()
+          .filter(k -> {
+            try {
+              return isKeyAvailable(k, sharedScanner.get());
+            } catch (IOException e) {
+              LOG.error("Failed to check key availability: " + k);
+              return false;
+            }
+          })
+          // Record position is not supported for HFile
+          .map(key -> Pair.of(key, HoodieRecordLocation.INVALID_POSITION))
+          .collect(Collectors.toSet());
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
index 00ba9fc3bb0..2af8eaa4a46 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
@@ -22,8 +22,9 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.common.util.BaseFileUtils;
-import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.OrcReaderIterator;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
@@ -67,7 +68,7 @@ public class HoodieAvroOrcReader extends 
HoodieAvroFileReaderBase {
   }
 
   @Override
-  public Set<String> filterRowKeys(Set candidateRowKeys) {
+  public Set<Pair<String, Long>> filterRowKeys(Set candidateRowKeys) {
     return orcUtils.filterRowKeys(conf, path, candidateRowKeys);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index 3dd070fa0a7..912ccb28964 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetReaderIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -85,7 +86,7 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReaderBase {
   }
 
   @Override
-  public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
+  public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
     return parquetUtils.filterRowKeys(conf, path, candidateRowKeys);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java
index b22682be46e..7ac30547a1a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.Schema;
 
@@ -55,7 +56,7 @@ public abstract class HoodieBootstrapFileReader<T> implements 
HoodieFileReader<T
   }
 
   @Override
-  public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
+  public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
     return skeletonFileReader.filterRowKeys(candidateRowKeys);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index 00fff9a220c..77eaef98d33 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -18,10 +18,12 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.avro.Schema;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.util.Set;
@@ -45,7 +47,7 @@ public interface HoodieFileReader<T> extends AutoCloseable {
 
   BloomFilter readBloomFilter();
 
-  Set<String> filterRowKeys(Set<String> candidateRowKeys);
+  Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys);
 
   ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema, 
Schema requestedSchema) throws IOException;
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 79031d643d9..199b71ab95f 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -60,6 +60,7 @@ import 
org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.CorruptedLogFileException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -1395,7 +1396,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
             .collect(Collectors.toList());
 
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
-    HoodieDeleteBlock deleteBlock = new 
HoodieDeleteBlock(deletedRecords.toArray(new DeleteRecord[50]), header);
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    for (DeleteRecord dr : deletedRecords) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
     writer.appendBlock(deleteBlock);
 
     List<String> allLogFiles =
@@ -1533,9 +1538,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .collect(Collectors.toList()).subList(0, 50);
 
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
-    HoodieDeleteBlock deleteBlock = new 
HoodieDeleteBlock(deletedKeys.stream().map(deletedKey ->
-            DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()))
-        .collect(Collectors.toList()).toArray(new DeleteRecord[0]), header);
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    for (HoodieKey deletedKey : deletedKeys) {
+      deleteRecordList.add(Pair.of(
+          DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()), -1L));
+    }
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
     writer.appendBlock(deleteBlock);
 
     List<String> allLogFiles =
@@ -1553,10 +1561,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Recreate the delete block which should have been removed from 
consideration because of rollback block next to it.
     Map<HoodieLogBlock.HeaderMetadataType, String> deleteBlockHeader = new 
HashMap<>();
     deleteBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
"102");
-    deleteBlock = new HoodieDeleteBlock(
-        deletedKeys.stream().map(deletedKey ->
-                DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()))
-            .collect(Collectors.toList()).toArray(new DeleteRecord[0]), 
deleteBlockHeader);
+    deleteRecordList = new ArrayList<>();
+    for (HoodieKey deletedKey : deletedKeys) {
+      deleteRecordList.add(Pair.of(
+          DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()), -1L));
+    }
+    deleteBlock = new HoodieDeleteBlock(deleteRecordList, false, 
deleteBlockHeader);
     writer.appendBlock(deleteBlock);
 
     FileCreateUtils.createDeltaCommit(basePath, "102", fs);
@@ -1644,16 +1654,24 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .collect(Collectors.toList());
 
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
-    HoodieDeleteBlock deleteBlock1 = new 
HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header);
+
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    for (DeleteRecord dr : deleteRecords1) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecordList, 
false, header);
     writer.appendBlock(deleteBlock1);
 
     // Delete another 10 keys with -1 as orderingVal.
     // The deletion should not work
 
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
-    HoodieDeleteBlock deleteBlock2 = new 
HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
-        .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
-            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 
-1))).toArray(DeleteRecord[]::new), header);
+    deleteRecordList = copyOfRecords1.subList(10, 20).stream()
+        .map(s -> Pair.of(DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1), -1L))
+        .collect(Collectors.toList());
+
+    HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(deleteRecordList, 
false, header);
     writer.appendBlock(deleteBlock2);
 
     // Delete another 10 keys with +1 as orderingVal.
@@ -1664,7 +1682,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .collect(Collectors.toList());
 
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
-    HoodieDeleteBlock deleteBlock3 = new 
HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header);
+    deleteRecordList.clear();
+    for (DeleteRecord dr : deletedRecords3) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deleteRecordList, 
false, header);
     writer.appendBlock(deleteBlock3);
 
     List<String> allLogFiles =
@@ -1760,7 +1782,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
             ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
         .collect(Collectors.toList()).subList(0, 50);
 
-    HoodieDeleteBlock deleteBlock = new 
HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    for (DeleteRecord dr : deleteRecords) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
     writer.appendBlock(deleteBlock);
 
     FileCreateUtils.createDeltaCommit(basePath, "100", fs);
@@ -1817,7 +1843,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
             ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
         .collect(Collectors.toList()).subList(0, 50);
-    HoodieDeleteBlock deleteBlock = new 
HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
+
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    for (DeleteRecord dr : deleteRecords) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
     writer.appendBlock(deleteBlock);
 
     FileCreateUtils.createDeltaCommit(basePath, "100", fs);
@@ -1906,7 +1937,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
             ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
         .collect(Collectors.toList()).subList(0, 50);
-    HoodieDeleteBlock deleteBlock = new 
HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    for (DeleteRecord dr : deleteRecords) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
     writer.appendBlock(deleteBlock);
 
     FileCreateUtils.createDeltaCommit(basePath, "100", fs);
@@ -1986,7 +2021,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
             ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
         .collect(Collectors.toList()).subList(0, 60);
-    writer.appendBlock(new HoodieDeleteBlock(deletedRecords.toArray(new 
DeleteRecord[0]), header));
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    for (DeleteRecord dr : deletedRecords) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    writer.appendBlock(new HoodieDeleteBlock(deleteRecordList, false, header));
 
     copyOfRecords2.addAll(copyOfRecords1);
     List<String> originalKeys =
@@ -2794,13 +2833,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ValueSource(booleans = {false, true})
   public void testGetRecordPositions(boolean addRecordPositionsHeader) throws 
IOException {
     Map<HeaderMetadataType, String> header = new HashMap<>();
-    List<Long> positions = new ArrayList<>();
+    Set<Long> positions = new HashSet<>();
     if (addRecordPositionsHeader) {
       positions = TestLogReaderUtils.generatePositions();
       String content = LogReaderUtils.encodePositions(positions);
       header.put(HeaderMetadataType.RECORD_POSITIONS, content);
     }
-    HoodieLogBlock logBlock = new HoodieDeleteBlock(new DeleteRecord[0], 
header);
+    HoodieLogBlock logBlock = new HoodieDeleteBlock(Collections.emptyList(), 
addRecordPositionsHeader, header);
     if (addRecordPositionsHeader) {
       TestLogReaderUtils.assertPositionEquals(positions, 
logBlock.getRecordPositions());
     }
@@ -2817,11 +2856,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       case CDC_DATA_BLOCK:
         return new HoodieCDCDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       case AVRO_DATA_BLOCK:
-        return new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+        return new HoodieAvroDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       case HFILE_DATA_BLOCK:
         return new HoodieHFileDataBlock(records, header, 
Compression.Algorithm.GZ, pathForReader);
       case PARQUET_DATA_BLOCK:
-        return new HoodieParquetDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
+        return new HoodieParquetDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
       default:
         throw new RuntimeException("Unknown data block type " + dataBlockType);
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index 83a439c3ad1..da3deeb2da4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.BeforeAll;
@@ -109,7 +108,7 @@ public class TestHoodieLogFormatAppendFailure {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(2);
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, false, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
 
     Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
         
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
index fd8e3a5cd28..f06599fbe74 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
@@ -23,7 +23,6 @@ import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -42,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 public class TestLogReaderUtils {
   @Test
   public void testEncodeAndDecodePositions() throws IOException {
-    List<Long> positions = generatePositions();
+    Set<Long> positions = generatePositions();
     String content = LogReaderUtils.encodePositions(positions);
     Roaring64NavigableMap roaring64NavigableMap = 
LogReaderUtils.decodeRecordPositionsHeader(content);
     assertPositionEquals(positions, roaring64NavigableMap);
@@ -51,7 +50,7 @@ public class TestLogReaderUtils {
   @Test
   public void testEncodeBitmapAndDecodePositions() throws IOException {
     Roaring64NavigableMap positionBitmap = new Roaring64NavigableMap();
-    List<Long> positions = generatePositions();
+    Set<Long> positions = generatePositions();
     positions.forEach(positionBitmap::add);
     String content = LogReaderUtils.encodePositions(positionBitmap);
     Roaring64NavigableMap roaring64NavigableMap = 
LogReaderUtils.decodeRecordPositionsHeader(content);
@@ -60,25 +59,25 @@ public class TestLogReaderUtils {
 
   @Test
   public void testCompatibilityOfDecodingPositions() throws IOException {
-    List<Long> expectedPositions = Arrays.stream(
+    Set<Long> expectedPositions = Arrays.stream(
             
readLastLineFromResourceFile("/format/expected_record_positions.data").split(","))
-        .map(Long::parseLong).collect(Collectors.toList());
+        .map(Long::parseLong).collect(Collectors.toSet());
     String content = 
readLastLineFromResourceFile("/format/record_positions_header_v3.data");
     Roaring64NavigableMap roaring64NavigableMap = 
LogReaderUtils.decodeRecordPositionsHeader(content);
     assertPositionEquals(expectedPositions, roaring64NavigableMap);
   }
 
-  public static List<Long> generatePositions() {
+  public static Set<Long> generatePositions() {
     Random random = new Random(0x2023);
     Set<Long> positions = new HashSet<>();
     while (positions.size() < 1000) {
       long pos = Math.abs(random.nextLong() % 1_000_000_000_000L);
       positions.add(pos);
     }
-    return new ArrayList<>(positions);
+    return positions;
   }
 
-  public static void assertPositionEquals(List<Long> expectedPositions,
+  public static void assertPositionEquals(Set<Long> expectedPositions,
                                           Roaring64NavigableMap 
roaring64NavigableMap) {
     List<Long> sortedExpectedPositions =
         expectedPositions.stream().sorted().collect(Collectors.toList());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
index ccba018e64f..7441f8cdd41 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log.block;
 
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -32,9 +33,11 @@ import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDate;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -116,7 +119,11 @@ public class TestHoodieDeleteBlock {
   }
 
   public void testDeleteBlockWithValidation(DeleteRecord[] deleteRecords) 
throws IOException {
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords, new 
HashMap<>());
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    for (DeleteRecord dr : deleteRecords) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, new HashMap<>());
     byte[] contentBytes = deleteBlock.getContentBytes();
     HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock(
         Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(), 
new HashMap<>());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index c29e9275bbc..fec7dc7fd32 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
 import org.apache.avro.JsonProperties;
@@ -120,13 +121,13 @@ public class TestParquetUtils extends 
HoodieCommonTestHarness {
     writeParquetFile(typeCode, filePath, rowKeys);
 
     // Read and verify
-    Set<String> filtered =
+    Set<Pair<String, Long>> filtered =
         parquetUtils.filterRowKeys(HoodieTestUtils.getDefaultHadoopConf(), new 
Path(filePath), filter);
 
     assertEquals(filter.size(), filtered.size(), "Filtered count does not 
match");
 
-    for (String rowKey : filtered) {
-      assertTrue(filter.contains(rowKey), "filtered key must be in the given 
filter");
+    for (Pair<String, Long> rowKeyAndPosition : filtered) {
+      assertTrue(filter.contains(rowKeyAndPosition.getLeft()), "filtered key 
must be in the given filter");
     }
   }
 
@@ -147,12 +148,12 @@ public class TestParquetUtils extends 
HoodieCommonTestHarness {
     writeParquetFile(typeCode, filePath, rowKeys, schema, true, partitionPath);
 
     // Read and verify
-    List<HoodieKey> fetchedRows =
-        parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(), 
new Path(filePath));
+    List<Pair<HoodieKey, Long>> fetchedRows =
+        
parquetUtils.fetchRecordKeysWithPositions(HoodieTestUtils.getDefaultHadoopConf(),
 new Path(filePath));
     assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not 
match");
 
-    for (HoodieKey entry : fetchedRows) {
-      assertTrue(expected.contains(entry), "Record key must be in the given 
filter");
+    for (Pair<HoodieKey, Long> entry : fetchedRows) {
+      assertTrue(expected.contains(entry.getLeft()), "Record key must be in 
the given filter");
     }
   }
 
@@ -168,18 +169,18 @@ public class TestParquetUtils extends 
HoodieCommonTestHarness {
     }
 
     String filePath = Paths.get(basePath, "test.parquet").toUri().toString();
-    Schema schema = getSchemaWithFields(Arrays.asList(new String[]{"abc", 
"def"}));
+    Schema schema = getSchemaWithFields(Arrays.asList(new String[] {"abc", 
"def"}));
     writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys, 
schema, true, partitionPath,
         false, "abc", "def");
 
     // Read and verify
-    List<HoodieKey> fetchedRows =
-        parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(), 
new Path(filePath),
-            Option.of(new TestBaseKeyGen("abc","def")));
+    List<Pair<HoodieKey, Long>> fetchedRows =
+        
parquetUtils.fetchRecordKeysWithPositions(HoodieTestUtils.getDefaultHadoopConf(),
 new Path(filePath),
+            Option.of(new TestBaseKeyGen("abc", "def")));
     assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not 
match");
 
-    for (HoodieKey entry : fetchedRows) {
-      assertTrue(expected.contains(entry), "Record key must be in the given 
filter");
+    for (Pair<HoodieKey, Long> entry : fetchedRows) {
+      assertTrue(expected.contains(entry.getLeft()), "Record key must be in 
the given filter");
     }
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
index c2e992223f4..a4d7cb29023 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
@@ -21,15 +21,18 @@ package org.apache.hudi.common.util;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.util.Utf8;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Objects;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -69,7 +72,9 @@ public class TestSerializationUtils {
   @Test
   public void testClassFullyQualifiedNameSerialization() throws IOException {
     DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key", 
"partition"));
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(new 
DeleteRecord[]{deleteRecord}, Collections.emptyMap());
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+    deleteRecordList.add(Pair.of(deleteRecord, -1L));
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, Collections.emptyMap());
 
     byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
     byte[] secondBytes = SerializationUtils.serialize(deleteBlock);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
index f6e0fa8f416..d8923d815b3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -242,10 +244,14 @@ public abstract class TestHoodieReaderWriterBase {
   private void verifyFilterRowKeys(HoodieAvroFileReader hoodieReader) {
     Set<String> candidateRowKeys = IntStream.range(40, NUM_RECORDS * 2)
         .mapToObj(i -> "key" + String.format("%02d", 
i)).collect(Collectors.toCollection(TreeSet::new));
-    List<String> expectedKeys = IntStream.range(40, NUM_RECORDS)
-        .mapToObj(i -> "key" + String.format("%02d", 
i)).sorted().collect(Collectors.toList());
-    assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys)
-        .stream().sorted().collect(Collectors.toList()));
+    Set<Pair<String, Long>> expectedKeys = IntStream.range(40, NUM_RECORDS)
+        .mapToObj(i -> {
+          // record position is not written for HFile format
+          long position = hoodieReader instanceof HoodieAvroHFileReader ? 
HoodieRecordLocation.INVALID_POSITION : (long) i;
+          String key = "key" + String.format("%02d", i);
+          return Pair.of(key, position);
+        }).collect(Collectors.toSet());
+    assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys));
   }
 
   private void verifyReaderWithSchema(String schemaPath, HoodieAvroFileReader 
hoodieReader) throws IOException {
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 4207e3bf113..9ebe2660d2e 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -400,9 +400,9 @@ public class InputFormatTestUtil {
       dataBlock = new HoodieHFileDataBlock(
           hoodieRecords, header, Compression.Algorithm.GZ, 
writer.getLogFile().getPath());
     } else if (logBlockType == 
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
-      dataBlock = new HoodieParquetDataBlock(hoodieRecords, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
+      dataBlock = new HoodieParquetDataBlock(hoodieRecords, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
     } else {
-      dataBlock = new HoodieAvroDataBlock(hoodieRecords, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      dataBlock = new HoodieAvroDataBlock(hoodieRecords, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
     }
     writer.appendBlock(dataBlock);
     return writer;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
index c6b0560b87e..46bd78fd775 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -155,16 +155,18 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
 
   private static Stream<Arguments> testClusteringWithNoBaseFiles() {
     return Stream.of(
-        Arguments.of(true, true),
-        Arguments.of(true, false),
-        Arguments.of(false, true),
-        Arguments.of(false, false)
+        Arguments.of(true, true, false),
+        Arguments.of(true, false, false),
+        Arguments.of(false, true, false),
+        Arguments.of(false, false, false),
+        // do updates with file slice having no base files and write record 
positions in log blocks
+        Arguments.of(true, true, true)
     );
   }
 
   @ParameterizedTest
   @MethodSource
-  void testClusteringWithNoBaseFiles(boolean clusteringAsRow, boolean 
doUpdates) throws Exception {
+  void testClusteringWithNoBaseFiles(boolean clusteringAsRow, boolean 
doUpdates, boolean shouldWriteRecordPositions) throws Exception {
     // set low compaction small File Size to generate more file groups.
     HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
         .forTable("test-trip-table")
@@ -185,6 +187,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
         // set index type to INMEMORY so that log files can be indexed, and it 
is safe to send
         // inserts straight to the log to produce file slices with only log 
files and no data files
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+        .withWriteRecordPositionsEnabled(shouldWriteRecordPositions)
         .withClusteringConfig(HoodieClusteringConfig.newBuilder()
             .withClusteringMaxNumGroups(10)
             .withClusteringTargetPartitions(0)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index a1b4f3e307e..300c9ab877b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -138,6 +138,8 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       "hoodie.upsert.shuffle.parallelism" -> "4",
       "hoodie.bulkinsert.shuffle.parallelism" -> "2",
       "hoodie.delete.shuffle.parallelism" -> "1",
+      "hoodie.merge.small.file.group.candidates.limit" -> "0",
+      HoodieWriteConfig.WRITE_RECORD_POSITIONS.key -> "true",
       DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
       DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path",
       DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d8a3a01e72e..8958f9de92d 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -521,7 +521,7 @@ public class HiveTestUtil {
     Map<HeaderMetadataType, String> header = new HashMap<>(2);
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
dataFile.getCommitTime());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, false, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
     logWriter.appendBlock(dataBlock);
     logWriter.close();
     return logWriter.getLogFile();

Reply via email to