This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3018c492e42 [HUDI-8564] Removing WriteStatus references in Hoodie 
Metadata writer flow (#12321)
3018c492e42 is described below

commit 3018c492e420a045d4614e96e5705543b1f09eb0
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Nov 27 21:45:21 2024 -0800

    [HUDI-8564] Removing WriteStatus references in Hoodie Metadata writer flow 
(#12321)
    
    - Fixing RLI and SI record generation in MDT to not rely on 
RDD<WriteStatus> as it has edge cases wrt spark task retries. This patch does 
on-demand reads from base and log files to generate RLI and SI records for MDT.
    
    ---------
    
    Co-authored-by: vinoth chandar <[email protected]>
---
 .../org/apache/hudi/client/BaseHoodieClient.java   |   6 +-
 .../hudi/client/BaseHoodieTableServiceClient.java  |  14 +-
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  15 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   4 +
 .../metadata/HoodieBackedTableMetadataWriter.java  |  85 +---
 .../hudi/metadata/HoodieTableMetadataWriter.java   |   8 +-
 .../hudi/table/action/BaseActionExecutor.java      |   6 +-
 .../action/commit/BaseCommitActionExecutor.java    |   4 +-
 .../index/WriteStatBasedIndexingCatchupTask.java   |   2 +-
 .../common/testutils/HoodieMetadataTestTable.java  |  12 +-
 .../hudi/client/HoodieFlinkTableServiceClient.java |  14 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  13 +-
 .../commit/BaseFlinkCommitActionExecutor.java      |   4 +-
 .../hudi/client/HoodieJavaTableServiceClient.java  |   7 -
 .../apache/hudi/client/HoodieJavaWriteClient.java  |   2 +-
 .../commit/BaseJavaCommitActionExecutor.java       |   3 +-
 .../hudi/client/SparkRDDTableServiceClient.java    |   5 -
 .../apache/hudi/client/SparkRDDWriteClient.java    |   2 +-
 .../SparkBootstrapCommitActionExecutor.java        |   2 +-
 .../commit/BaseSparkCommitActionExecutor.java      |   2 +-
 .../hudi/client/TestSparkRDDWriteClient.java       |   2 +-
 .../functional/TestConsistentBucketIndex.java      |   2 +-
 .../TestMetadataUtilRLIandSIRecordGeneration.java  | 506 +++++++++++++++++++++
 .../apache/hudi/io/TestHoodieTimelineArchiver.java |   2 +-
 .../TestHoodieSparkMergeOnReadTableCompaction.java |   4 +-
 .../TestSparkNonBlockingConcurrencyControl.java    |  11 +-
 .../hudi/testutils/HoodieCleanerTestBase.java      |   2 +-
 .../apache/hudi/common/model/HoodieWriteStat.java  |  17 +-
 .../table/log/HoodieUnMergedLogRecordScanner.java  |  30 +-
 .../hudi/metadata/BaseFileRecordParsingUtils.java  | 186 ++++++++
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 197 ++++++++
 .../hudi/common/model/TestHoodieWriteStat.java     |   6 +
 .../hudi/sink/clustering/ClusteringCommitSink.java |   4 +-
 .../internal/DataSourceInternalWriterHelper.java   |   2 +-
 .../TestSparkConsistentBucketClustering.java       |   4 +-
 .../functional/TestSparkSortAndSizeClustering.java |   2 +-
 .../hudi/functional/RecordLevelIndexTestBase.scala |   4 +-
 .../hudi/functional/TestRecordLevelIndex.scala     |  28 +-
 .../offlinejob/HoodieOfflineJobTestBase.java       |   2 +-
 39 files changed, 1037 insertions(+), 184 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 9de7f647092..c054a817b52 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -24,7 +24,6 @@ import 
org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.client.utils.TransactionUtils;
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -269,14 +268,13 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
    * @param table         {@link HoodieTable} of interest.
    * @param instantTime   instant time of the commit.
    * @param metadata      instance of {@link HoodieCommitMetadata}.
-   * @param writeStatuses Write statuses of the commit
    */
-  protected void writeTableMetadata(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
+  protected void writeTableMetadata(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata) {
     context.setJobStatus(this.getClass().getSimpleName(), "Committing to 
metadata table: " + config.getTableName());
     Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
     if (metadataWriterOpt.isPresent()) {
       try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
-        metadataWriter.updateFromWriteStatuses(metadata, writeStatuses, 
instantTime);
+        metadataWriter.update(metadata, instantTime);
       } catch (Exception e) {
         if (e instanceof HoodieException) {
           throw (HoodieException) e;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index c6d1d8048fa..e96a6707247 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -31,7 +31,6 @@ import org.apache.hudi.client.heartbeat.HeartbeatUtils;
 import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
 import org.apache.hudi.client.timeline.TimelineArchivers;
 import org.apache.hudi.common.HoodiePendingRollbackInfo;
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -341,7 +340,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       this.txnManager.beginTransaction(Option.of(compactionInstant), 
Option.empty());
       finalizeWrite(table, compactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
-      writeTableMetadata(table, compactionCommitTime, metadata, 
context.emptyHoodieData());
+      writeTableMetadata(table, compactionCommitTime, metadata);
       LOG.info("Committing Compaction {}", compactionCommitTime);
       LOG.debug("Compaction {} finished with result: {}", 
compactionCommitTime, metadata);
       CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
@@ -404,7 +403,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       preCommit(metadata);
       finalizeWrite(table, logCompactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
-      writeTableMetadata(table, logCompactionCommitTime, metadata, 
context.emptyHoodieData());
+      writeTableMetadata(table, logCompactionCommitTime, metadata);
       LOG.info("Committing Log Compaction {}", logCompactionCommitTime);
       LOG.debug("Log Compaction {} finished with result {}", 
logCompactionCommitTime, metadata);
       CompactHelpers.getInstance().completeInflightLogCompaction(table, 
logCompactionCommitTime, metadata);
@@ -490,7 +489,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
 
     // TODO : Where is shouldComplete used ?
     if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
-      completeClustering((HoodieReplaceCommitMetadata) 
clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, 
Option.ofNullable(convertToWriteStatus(writeMetadata)));
+      completeClustering((HoodieReplaceCommitMetadata) 
clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
     }
     return clusteringMetadata;
   }
@@ -522,12 +521,9 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
 
   protected abstract HoodieWriteMetadata<O> 
convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);
 
-  protected abstract HoodieData<WriteStatus> 
convertToWriteStatus(HoodieWriteMetadata<T> writeMetadata);
-
   private void completeClustering(HoodieReplaceCommitMetadata metadata,
                                   HoodieTable table,
-                                  String clusteringCommitTime,
-                                  Option<HoodieData<WriteStatus>> 
writeStatuses) {
+                                  String clusteringCommitTime) {
     List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     handleWriteErrors(writeStats, TableServiceType.CLUSTER);
     final HoodieInstant clusteringInstant = 
ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
@@ -542,7 +538,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
         preCommit(metadata);
       }
       // Update table's metadata (table)
-      writeTableMetadata(table, clusteringInstant.requestedTime(), metadata, 
writeStatuses.orElseGet(context::emptyHoodieData));
+      writeTableMetadata(table, clusteringInstant.requestedTime(), metadata);
 
       LOG.info("Committing Clustering {}", clusteringCommitTime);
       LOG.debug("Clustering {} finished with result {}", clusteringCommitTime, 
metadata);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 5781b0cb87a..0dffda05a59 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -34,7 +34,6 @@ import org.apache.hudi.client.utils.TransactionUtils;
 import org.apache.hudi.common.HoodiePendingRollbackInfo;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -216,12 +215,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
                                  String commitActionType, Map<String, 
List<String>> partitionToReplacedFileIds,
                                  Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc);
 
-  public boolean commitStats(String instantTime, HoodieData<WriteStatus> 
writeStatuses, List<HoodieWriteStat> stats, Option<Map<String, String>> 
extraMetadata,
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata,
                              String commitActionType) {
-    return commitStats(instantTime, writeStatuses, stats, extraMetadata, 
commitActionType, Collections.emptyMap(), Option.empty());
+    return commitStats(instantTime, stats, extraMetadata, commitActionType, 
Collections.emptyMap(), Option.empty());
   }
 
-  public boolean commitStats(String instantTime, HoodieData<WriteStatus> 
writeStatuses, List<HoodieWriteStat> stats,
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
                              Option<Map<String, String>> extraMetadata,
                              String commitActionType, Map<String, 
List<String>> partitionToReplaceFileIds,
                              Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
@@ -243,7 +242,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       if (extraPreCommitFunc.isPresent()) {
         extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
       }
-      commit(table, commitActionType, instantTime, metadata, stats, 
writeStatuses);
+      commit(table, commitActionType, instantTime, metadata, stats);
       postCommit(table, metadata, instantTime, extraMetadata);
       LOG.info("Committed " + instantTime);
     } catch (IOException e) {
@@ -282,7 +281,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   }
 
   protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
-                        List<HoodieWriteStat> stats, HoodieData<WriteStatus> 
writeStatuses) throws IOException {
+                        List<HoodieWriteStat> stats) throws IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     // Finalize write
@@ -293,7 +292,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       saveInternalSchema(table, instantTime, metadata);
     }
     // update Metadata table
-    writeTableMetadata(table, instantTime, metadata, writeStatuses);
+    writeTableMetadata(table, instantTime, metadata);
     activeTimeline.saveAsComplete(false, 
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, 
commitActionType, instantTime),
         
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), 
metadata));
   }
@@ -1627,7 +1626,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     // try to save history schemas
     FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(metaClient);
     schemasManager.persistHistorySchemaStr(instantTime, 
SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
-    commitStats(instantTime, context.emptyHoodieData(), 
Collections.emptyList(), Option.of(extraMeta), commitActionType);
+    commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), 
commitActionType);
   }
 
   private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 1bf6f6b0138..707c86dd73a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -171,6 +171,10 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     try {
       String latestValidFilePath = baseFileToMerge.getFileName();
       writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
+      // At the moment, we only support SI for overwrite with latest payload. 
So, we don't need to embed entire file slice here.
+      // HUDI-8518 will be taken up to fix it for any payload during which we 
might require entire file slice to be set here.
+      // Already AppendHandle adds all logs file from current file slice to 
HoodieDeltaWriteStat.
+      writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
 
       HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(storage, instantTime,
           new StoragePath(config.getBasePath()),
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index e05ab249dca..a09ea08eadf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -25,7 +25,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.BaseHoodieWriteClient;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.EngineType;
@@ -42,7 +41,6 @@ import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieIndexMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -1043,29 +1041,28 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
 
   /**
    * Update from {@code HoodieCommitMetadata}.
-   *
    * @param commitMetadata {@code HoodieCommitMetadata}
    * @param instantTime    Timestamp at which the commit was performed
    */
   @Override
-  public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus> writeStatus, String instantTime) {
+  public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
     processAndCommit(instantTime, () -> {
       Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
               engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
               enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
               dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getMetadataConfig());
+              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getWritesFileIdEncoding(),
+              dataWriteConfig.getMetadataConfig());
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
       if (dataWriteConfig.isRecordIndexEnabled()) {
-        HoodieData<HoodieRecord> updatesFromWriteStatuses = 
getRecordIndexUpserts(writeStatus);
-        HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
-        partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), 
updatesFromWriteStatuses.union(additionalUpdates));
+        HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()),
 commitMetadata);
+        partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), 
partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
       }
       updateFunctionalIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
-      updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, 
writeStatus);
+      updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, 
instantTime);
       return partitionToRecordMap;
     });
     closeInternal();
@@ -1079,7 +1076,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
               engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
               enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
               dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getMetadataConfig());
+              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
+              dataWriteConfig.getWritesFileIdEncoding(), 
dataWriteConfig.getMetadataConfig());
       HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(records, commitMetadata);
       partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), 
records.union(additionalUpdates));
       updateFunctionalIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
@@ -1126,7 +1124,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf, instantTime);
   }
 
-  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap, 
HoodieData<WriteStatus> writeStatus) {
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap,
+                                             String instantTime) {
     if (!dataWriteConfig.isSecondaryIndexEnabled()) {
       return;
     }
@@ -1145,7 +1144,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
         .forEach(partition -> {
           HoodieData<HoodieRecord> secondaryIndexRecords;
           try {
-            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, writeStatus);
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, instantTime);
           } catch (Exception e) {
             throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
           }
@@ -1153,20 +1152,13 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
         });
   }
 
-  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
     List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = 
getPartitionFilePairs(commitMetadata);
     // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
-    // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
-    // operation is going to be expensive (in CPU, memory and IO)
-    List<String> keysToRemove = new ArrayList<>();
-    writeStatus.collectAsList().forEach(status -> {
-      status.getWrittenRecordDelegates().forEach(recordDelegate -> {
-        // Consider those keys which were either updated or deleted in this 
commit
-        if (!recordDelegate.getNewLocation().isPresent() || 
(recordDelegate.getCurrentLocation().isPresent() && 
recordDelegate.getNewLocation().isPresent())) {
-          keysToRemove.add(recordDelegate.getRecordKey());
-        }
-      });
-    });
+    // the secondary index partition for each of these keys.
+    List<String> keysToRemove = 
HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, 
commitMetadata, dataWriteConfig.getMetadataConfig(),
+        dataMetaClient, instantTime);
+
     HoodieIndexDefinition indexDefinition = 
getFunctionalIndexDefinition(indexPartition);
     // Fetch the secondary keys that each of the record keys ('keysToRemove') 
maps to
     // This is obtained by scanning the entire secondary index partition in 
the metadata table
@@ -1671,51 +1663,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     }
   }
 
-  /**
-   * Return records that represent upserts to the record index due to write 
operation on the dataset.
-   *
-   * @param writeStatuses {@code WriteStatus} from the write operation
-   */
-  private HoodieData<HoodieRecord> 
getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {
-    return writeStatuses.flatMap(writeStatus -> {
-      List<HoodieRecord> recordList = new LinkedList<>();
-      for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
-        if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
-          if (recordDelegate.getIgnoreIndexUpdate()) {
-            continue;
-          }
-          HoodieRecord hoodieRecord;
-          Option<HoodieRecordLocation> newLocation = 
recordDelegate.getNewLocation();
-          if (newLocation.isPresent()) {
-            if (recordDelegate.getCurrentLocation().isPresent()) {
-              // This is an update, no need to update index if the location 
has not changed
-              // newLocation should have the same fileID as currentLocation. 
The instantTimes differ as newLocation's
-              // instantTime refers to the current commit which was completed.
-              if 
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
 {
-                final String msg = String.format("Detected update in location 
of record with key %s from %s to %s. The fileID should not change.",
-                    recordDelegate, recordDelegate.getCurrentLocation().get(), 
newLocation.get());
-                LOG.error(msg);
-                throw new HoodieMetadataException(msg);
-              }
-              // for updates, we can skip updating RLI partition in MDT
-            } else {
-              // Insert new record case
-              hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
-                  recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(),
-                  newLocation.get().getFileId(), 
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
-              recordList.add(hoodieRecord);
-            }
-          } else {
-            // Delete existing index for a deleted record
-            hoodieRecord = 
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
-            recordList.add(hoodieRecord);
-          }
-        }
-      }
-      return recordList.iterator();
-    });
-  }
-
   private HoodieData<HoodieRecord> 
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata 
replaceCommitMetadata) {
     try (HoodieMetadataFileSystemView fsView = getMetadataView()) {
       List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = 
replaceCommitMetadata
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index 7578949a8e0..119c97f0fce 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -57,16 +56,15 @@ public interface HoodieTableMetadataWriter extends 
Serializable, AutoCloseable {
 
   /**
    * Update the metadata table due to a COMMIT operation.
-   *
    * @param commitMetadata commit metadata of the operation of interest.
    * @param instantTime    instant time of the commit.
    */
-  void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus> writeStatuses, String instantTime);
+  void update(HoodieCommitMetadata commitMetadata, String instantTime);
 
   /**
    * Update the metadata table due to a COMMIT or REPLACECOMMIT operation.
-   * As compared to {@link #updateFromWriteStatuses(HoodieCommitMetadata, 
HoodieData, String)}, this method
-   * directly updates metadata with the given records, instead of first 
converting {@link WriteStatus} to {@link HoodieRecord}.
+   * As compared to {@link #update(HoodieCommitMetadata, String)}, this method
+   * directly updates metadata with the given records, instead of generating 
HoodieRecords based on HoodieCommitMetadata.
    *
    * @param commitMetadata commit metadata of the operation of interest.
    * @param records        records to update metadata with.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index c4ca5677832..5940129c91d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -21,8 +21,6 @@ package org.apache.hudi.table.action;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -72,7 +70,7 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
    *
    * @param metadata commit metadata of interest.
    */
-  protected final void writeTableMetadata(HoodieCommitMetadata metadata, 
HoodieData<WriteStatus> writeStatus, String actionType) {
+  protected final void writeTableMetadata(HoodieCommitMetadata metadata, 
String actionType) {
     // Recreate MDT for insert_overwrite_table operation.
     if (table.getConfig().isMetadataTableEnabled()
         && WriteOperationType.INSERT_OVERWRITE_TABLE == 
metadata.getOperationType()) {
@@ -83,7 +81,7 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
     Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
     if (metadataWriterOpt.isPresent()) {
       try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
-        metadataWriter.updateFromWriteStatuses(metadata, writeStatus, 
instantTime);
+        metadataWriter.update(metadata, instantTime);
       } catch (Exception e) {
         if (e instanceof HoodieException) {
           throw (HoodieException) e;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index ea20983819f..055fd7e10ba 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -211,7 +211,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
 
   protected abstract void commit(HoodieWriteMetadata<O> result);
 
-  protected void commit(HoodieData<WriteStatus> writeStatuses, 
HoodieWriteMetadata<O> result, List<HoodieWriteStat> writeStats) {
+  protected void commit(HoodieWriteMetadata<O> result, List<HoodieWriteStat> 
writeStats) {
     String actionType = getCommitActionType();
     LOG.info("Committing " + instantTime + ", action Type " + actionType + ", 
operation Type " + operationType);
     result.setCommitted(true);
@@ -222,7 +222,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
       HoodieCommitMetadata metadata = result.getCommitMetadata().get();
 
-      writeTableMetadata(metadata, writeStatuses, actionType);
+      writeTableMetadata(metadata, actionType);
       // cannot serialize maps with null values
       metadata.getExtraMetadata().entrySet().removeIf(entry -> 
entry.getValue() == null);
       activeTimeline.saveAsComplete(false,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
index 727028e1eb6..25230049a76 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
@@ -50,6 +50,6 @@ public class WriteStatBasedIndexingCatchupTask extends 
AbstractIndexingCatchupTa
   public void updateIndexForWriteAction(HoodieInstant instant) throws 
IOException {
     HoodieCommitMetadata commitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(instant,
         metaClient.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
-    metadataWriter.updateFromWriteStatuses(commitMetadata, 
engineContext.emptyHoodieData(), instant.requestedTime());
+    metadataWriter.update(commitMetadata, instant.requestedTime());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index b4a2f5a2d46..14c4885b4d0 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -92,7 +92,7 @@ public class HoodieMetadataTestTable extends HoodieTestTable {
         partitionToFilesNameLengthMap, bootstrap, true);
     if (writer != null && !createInflightCommit) {
       writer.performTableServices(Option.of(commitTime));
-      writer.updateFromWriteStatuses(commitMetadata, 
context.get().emptyHoodieData(), commitTime);
+      writer.update(commitMetadata, commitTime);
     }
     // DT should be committed after MDT.
     if (!createInflightCommit) {
@@ -110,7 +110,7 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
   public HoodieTestTable moveInflightCommitToComplete(String instantTime, 
HoodieCommitMetadata metadata) throws IOException {
     super.moveInflightCommitToComplete(instantTime, metadata);
     if (writer != null) {
-      writer.updateFromWriteStatuses(metadata, 
context.get().emptyHoodieData(), instantTime);
+      writer.update(metadata, instantTime);
     }
     return this;
   }
@@ -119,7 +119,7 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
   public HoodieTestTable moveInflightCompactionToComplete(String instantTime, 
HoodieCommitMetadata metadata) throws IOException {
     super.moveInflightCompactionToComplete(instantTime, metadata);
     if (writer != null) {
-      writer.updateFromWriteStatuses(metadata, 
context.get().emptyHoodieData(), instantTime);
+      writer.update(metadata, instantTime);
     }
     return this;
   }
@@ -146,7 +146,7 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
   public HoodieTestTable addCompaction(String instantTime, 
HoodieCommitMetadata commitMetadata) throws Exception {
     super.addCompaction(instantTime, commitMetadata);
     if (writer != null) {
-      writer.updateFromWriteStatuses(commitMetadata, 
context.get().emptyHoodieData(), instantTime);
+      writer.update(commitMetadata, instantTime);
     }
     return this;
   }
@@ -177,7 +177,7 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
       HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception {
     super.addReplaceCommit(instantTime, requestedReplaceMetadata, 
inflightReplaceMetadata, completeReplaceMetadata);
     if (writer != null) {
-      writer.updateFromWriteStatuses(completeReplaceMetadata, 
context.get().emptyHoodieData(), instantTime);
+      writer.update(completeReplaceMetadata, instantTime);
     }
     return this;
   }
@@ -187,7 +187,7 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
                                     HoodieReplaceCommitMetadata 
completeReplaceMetadata) throws Exception {
     super.addCluster(instantTime, requestedReplaceMetadata, 
inflightReplaceMetadata, completeReplaceMetadata);
     if (writer != null) {
-      writer.updateFromWriteStatuses(completeReplaceMetadata, 
context.get().emptyHoodieData(), instantTime);
+      writer.update(completeReplaceMetadata, instantTime);
     }
     return this;
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 5fac2d9a037..3874acdd429 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -20,8 +20,6 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
@@ -88,7 +86,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
       // commit to data table after committing to metadata table.
       // Do not do any conflict resolution here as we do with regular writes. 
We take the lock here to ensure all writes to metadata table happens within a
       // single lock (single writer). Because more than one write to metadata 
table will result in conflicts since all of them updates the same partition.
-      writeTableMetadata(table, compactionCommitTime, metadata, 
context.emptyHoodieData());
+      writeTableMetadata(table, compactionCommitTime, metadata);
       LOG.info("Committing Compaction {} finished with result {}.", 
compactionCommitTime, metadata);
       CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
     } finally {
@@ -113,8 +111,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
   protected void completeClustering(
       HoodieReplaceCommitMetadata metadata,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
-      String clusteringCommitTime,
-      Option<HoodieData<WriteStatus>> writeStatuses) {
+      String clusteringCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
clustering write status and commit clustering");
     HoodieInstant clusteringInstant = 
ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime, 
table.getActiveTimeline(), table.getInstantGenerator()).get();
     List<HoodieWriteStat> writeStats = 
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
@@ -135,7 +132,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
       // commit to data table after committing to metadata table.
       // We take the lock here to ensure all writes to metadata table happens 
within a single lock (single writer).
       // Because more than one write to metadata table will result in 
conflicts since all of them updates the same partition.
-      writeTableMetadata(table, clusteringCommitTime, metadata, 
writeStatuses.orElseGet(context::emptyHoodieData));
+      writeTableMetadata(table, clusteringCommitTime, metadata);
 
       LOG.info("Committing Clustering {} finished with result {}.", 
clusteringCommitTime, metadata);
       ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(
@@ -180,11 +177,6 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
     return writeMetadata;
   }
 
-  @Override
-  protected HoodieData<WriteStatus> 
convertToWriteStatus(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
-    return HoodieListData.eager(writeMetadata.getWriteStatuses());
-  }
-
   @Override
   protected HoodieTable createTable(HoodieWriteConfig config, 
StorageConfiguration<?> storageConf, boolean skipValidation) {
     return createTableAndValidate(config, HoodieFlinkTable::create, 
skipValidation);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 3bac02db25a..ba3570a0b49 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.utils.TransactionUtils;
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -109,7 +108,7 @@ public class HoodieFlinkWriteClient<T> extends
         .values().stream()
         .map(duplicates -> 
duplicates.stream().reduce(WriteStatMerger::merge).get())
         .collect(Collectors.toList());
-    return commitStats(instantTime, HoodieListData.eager(writeStatuses), 
merged, extraMetadata, commitActionType, partitionToReplacedFileIds, 
extraPreCommitFunc);
+    return commitStats(instantTime, merged, extraMetadata, commitActionType, 
partitionToReplacedFileIds, extraPreCommitFunc);
   }
 
   @Override
@@ -375,9 +374,8 @@ public class HoodieFlinkWriteClient<T> extends
   private void completeClustering(
       HoodieReplaceCommitMetadata metadata,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
-      String clusteringCommitTime,
-      Option<HoodieData<WriteStatus>> writeStatuses) {
-    ((HoodieFlinkTableServiceClient<T>) 
tableServiceClient).completeClustering(metadata, table, clusteringCommitTime, 
writeStatuses);
+      String clusteringCommitTime) {
+    ((HoodieFlinkTableServiceClient<T>) 
tableServiceClient).completeClustering(metadata, table, clusteringCommitTime);
   }
 
   @Override
@@ -394,11 +392,10 @@ public class HoodieFlinkWriteClient<T> extends
       TableServiceType tableServiceType,
       HoodieCommitMetadata metadata,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
-      String commitInstant,
-      Option<HoodieData<WriteStatus>> writeStatuses) {
+      String commitInstant) {
     switch (tableServiceType) {
       case CLUSTER:
-        completeClustering((HoodieReplaceCommitMetadata) metadata, table, 
commitInstant, writeStatuses);
+        completeClustering((HoodieReplaceCommitMetadata) metadata, table, 
commitInstant);
         break;
       case COMPACT:
         completeCompaction(metadata, table, commitInstant);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 750ef71bc37..7a27830274f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -123,8 +122,7 @@ public abstract class BaseFlinkCommitActionExecutor<T> 
extends
 
   @Override
   protected void commit(HoodieWriteMetadata<List<WriteStatus>> result) {
-    commit(HoodieListData.eager(result.getWriteStatuses()), result,
-        
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+    commit(result, 
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
   }
 
   protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>> 
result) {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
index 8dfbcae32ee..ac463e69b0d 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
@@ -21,8 +21,6 @@ package org.apache.hudi.client;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.ClusteringUtils;
@@ -64,11 +62,6 @@ public class HoodieJavaTableServiceClient<T> extends 
BaseHoodieTableServiceClien
     return writeMetadata;
   }
 
-  @Override
-  protected HoodieData<WriteStatus> 
convertToWriteStatus(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
-    return HoodieListData.eager(writeMetadata.getWriteStatuses());
-  }
-
   @Override
   protected HoodieTable<?, List<HoodieRecord<T>>, ?, List<WriteStatus>> 
createTable(HoodieWriteConfig config, StorageConfiguration<?> storageConf, 
boolean skipValidation) {
     return createTableAndValidate(config, HoodieJavaTable::create, 
skipValidation);
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 968454a1b3d..4742c25c557 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -88,7 +88,7 @@ public class HoodieJavaWriteClient<T> extends
                         Map<String, List<String>> partitionToReplacedFileIds,
                         Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
     List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-    return commitStats(instantTime, HoodieListData.eager(writeStatuses), 
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
+    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds,
         extraPreCommitFunc);
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index c968a1cdab7..f447f630d80 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -189,8 +189,7 @@ public abstract class BaseJavaCommitActionExecutor<T> 
extends
 
   @Override
   protected void commit(HoodieWriteMetadata<List<WriteStatus>> result) {
-    commit(HoodieListData.eager(result.getWriteStatuses()), result,
-        
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+    commit(result, 
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
   }
 
   protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>> 
result) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
index 2f313b1ddd7..e7865064791 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -64,11 +64,6 @@ public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<
     return 
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
   }
 
-  @Override
-  protected HoodieData<WriteStatus> 
convertToWriteStatus(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
-    return writeMetadata.getWriteStatuses();
-  }
-
   @Override
   protected HoodieTable<?, HoodieData<HoodieRecord<T>>, ?, 
HoodieData<WriteStatus>> createTable(HoodieWriteConfig config, 
StorageConfiguration<?> storageConf, boolean skipValidation) {
     return createTableAndValidate(config, HoodieSparkTable::create, 
skipValidation);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 2e639b28192..12c9a0f80b7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -90,7 +90,7 @@ public class SparkRDDWriteClient<T> extends
                         Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
     context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
     List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), 
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, 
extraPreCommitFunc);
+    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index e7175c55d9f..9b842064000 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -214,7 +214,7 @@ public class SparkBootstrapCommitActionExecutor<T>
       LOG.info("Finished writing bootstrap index for source " + 
config.getBootstrapSourceBasePath() + " in table "
           + config.getBasePath());
     }
-    commit(result.getWriteStatuses(), result, 
bootstrapSourceAndStats.values().stream()
+    commit(result, bootstrapSourceAndStats.values().stream()
         .flatMap(f -> 
f.stream().map(Pair::getValue)).collect(Collectors.toList()));
     LOG.info("Committing metadata bootstrap !!");
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 36902a8c3f2..94d8748166b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -289,7 +289,7 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
   @Override
   protected void commit(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
     context.setJobStatus(this.getClass().getSimpleName(), "Commit write status 
collect: " + config.getTableName());
-    commit(result.getWriteStatuses(), result, 
result.getWriteStats().isPresent()
+    commit(result, result.getWriteStats().isPresent()
         ? result.getWriteStats().get() : 
result.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
index a299b2f337a..303f83241f7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
@@ -140,7 +140,7 @@ class TestSparkRDDWriteClient extends 
SparkClientFunctionalTestHarness {
     String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath());
     List<Integer> metadataTableCacheIds0 = 
context().getCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath, 
instant0));
     List<Integer> metadataTableCacheIds1 = 
context().getCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath, 
instant1));
-    writeClient.commitStats(instant1, context().parallelize(writeStatuses, 1), 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+    writeClient.commitStats(instant1, 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(), metaClient.getCommitActionType());
     writeClient.close();
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
index c790b3cebdc..812c17d3743 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
@@ -275,7 +275,7 @@ public class TestConsistentBucketIndex extends 
HoodieSparkClientTestHarness {
     }
     org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
     if (doCommit) {
-      boolean success = writeClient.commitStats(commitTime, 
context.parallelize(writeStatues, 1), 
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+      boolean success = writeClient.commitStats(commitTime, 
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
           Option.empty(), metaClient.getCommitActionType());
       Assertions.assertTrue(success);
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
new file mode 100644
index 00000000000..1a3c4bfa0e7
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.BaseFileRecordParsingUtils;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestMetadataUtilRLIandSIRecordGeneration extends 
HoodieClientTestBase {
+
+  /**
+   * Tests various methods used for RLI and SI record generation flows.
+   * We test below methods
+   * 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(...). 
This is used for RLI record generation.
+   * BaseFileRecordParsingUtils.getRecordKeyStatuses(...) // This is used in 
both RLI and SI flow.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testRecordGenerationAPIsForCOW() throws IOException {
+    HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+    cleanupClients();
+    initMetaClient(tableType);
+    cleanupTimelineService();
+    initTimelineService();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieWriteConfig writeConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // Insert
+      String commitTime = client.createNewInstantTime();
+      List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 100);
+      client.startCommitWithTime(commitTime);
+      List<WriteStatus> writeStatuses1 = 
client.insert(jsc.parallelize(records1, 1), commitTime).collect();
+      assertNoWriteErrors(writeStatuses1);
+
+      // assert RLI records for a base file from 1st commit
+      String finalCommitTime = commitTime;
+      Map<String, String> recordKeyToPartitionMapping1 = new HashMap<>();
+      Map<String, String> fileIdToFileNameMapping1 = new HashMap<>();
+      writeStatuses1.forEach(writeStatus -> {
+        assertEquals(writeStatus.getStat().getNumDeletes(), 0);
+        // Fetch record keys for all
+        try {
+          String writeStatFileId = writeStatus.getFileId();
+          if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) {
+            fileIdToFileNameMapping1.put(writeStatFileId, 
writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/")
 + 1));
+          }
+
+          // poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch 
MDT RLI records for inserts and deletes.
+          Iterator<HoodieRecord> rliRecordsItr = 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
+              writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), 
finalCommitTime, metaClient.getStorage());
+          while (rliRecordsItr.hasNext()) {
+            HoodieRecord rliRecord = rliRecordsItr.next();
+            String key = rliRecord.getRecordKey();
+            String partition = ((HoodieMetadataPayload) 
rliRecord.getData()).getRecordGlobalLocation().getPartitionPath();
+            recordKeyToPartitionMapping1.put(key, partition);
+          }
+        } catch (IOException e) {
+          throw new HoodieException("Should not have failed ", e);
+        }
+      });
+
+      Map<String, String> expectedRecordToPartitionMapping1 = new HashMap<>();
+      records1.forEach(record -> 
expectedRecordToPartitionMapping1.put(record.getRecordKey(), 
record.getPartitionPath()));
+
+      assertEquals(expectedRecordToPartitionMapping1, 
recordKeyToPartitionMapping1);
+
+      // lets update some records and assert RLI records.
+      commitTime = client.createNewInstantTime();
+      client.startCommitWithTime(commitTime);
+      String finalCommitTime2 = commitTime;
+      List<HoodieRecord> deletes2 = 
dataGen.generateUniqueDeleteRecords(commitTime, 30);
+      List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates(commitTime, 
30);
+      List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime, 30);
+      List<HoodieRecord> records2 = new ArrayList<>();
+      records2.addAll(inserts2);
+      records2.addAll(updates2);
+      records2.addAll(deletes2);
+
+      List<WriteStatus> writeStatuses2 = 
client.upsert(jsc.parallelize(records2, 1), commitTime).collect();
+      assertNoWriteErrors(writeStatuses2);
+
+      List<String> expectedInserts = inserts2.stream().map(record -> 
record.getKey().getRecordKey()).collect(Collectors.toList());
+      List<String> expectedDeletes = deletes2.stream().map(record -> 
record.getKey().getRecordKey()).collect(Collectors.toList());
+      List<String> actualInserts = new ArrayList<>();
+      List<String> actualDeletes = new ArrayList<>();
+      // only inserts and deletes will result in RLI records. lets validate 
that.
+      generateRliRecordsAndAssert(writeStatuses2, fileIdToFileNameMapping1, 
finalCommitTime2, writeConfig, actualInserts, actualDeletes);
+
+      assertListEquality(expectedInserts, actualInserts);
+      assertListEquality(expectedDeletes, actualDeletes);
+
+      // lets validate APIs in BaseFileParsingUtils directly
+      actualInserts = new ArrayList<>();
+      actualDeletes = new ArrayList<>();
+      List<String> actualUpdates = new ArrayList<>();
+      List<String> expectedUpdates = updates2.stream().map(record -> 
record.getKey().getRecordKey()).collect(Collectors.toList());
+      parseRecordKeysFromBaseFiles(writeStatuses2, fileIdToFileNameMapping1, 
finalCommitTime2, writeConfig, actualInserts, actualDeletes, actualUpdates);
+      assertListEquality(expectedInserts, actualInserts);
+      assertListEquality(expectedDeletes, actualDeletes);
+      // we can't really assert equality for updates. bcoz, w/ COW, we might 
just rewrite an existing parquet file. So, more records will be deduced as 
updates.
+      // And so, we are validating using contains.
+      expectedUpdates.forEach(entry -> 
assertTrue(actualUpdates.contains(entry)));
+    }
+  }
+
+  /**
+   * Tests various methods used for RLI and SI record generation flows w/ MOR 
table. here emphasis are given to log files.
+   * We test below methods
+   * 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(...). 
This is used for RLI record generation.
+   * HoodieTableMetadataUtil.getRecordKeys() // This is used in both RLI and 
SI flow.
+   * HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated() for 
HoodieCommitMetadata.
+   * <p>
+   * We also test few adhoc scenarios.
+   * - if any log files contains inserts, RLI and SI record generation should 
throw exception.
+   * - RLI do no generate any records for compaction operation.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testRecordGenerationAPIsForMOR() throws IOException {
+    HoodieTableType tableType = HoodieTableType.MERGE_ON_READ;
+    cleanupClients();
+    initMetaClient(tableType);
+    cleanupTimelineService();
+    initTimelineService();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieWriteConfig writeConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2)
+            .withInlineCompaction(true)
+            .compactionSmallFileSize(0).build()).build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // Insert
+      String commitTime = client.createNewInstantTime();
+      List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 100);
+      client.startCommitWithTime(commitTime);
+      List<WriteStatus> writeStatuses1 = 
client.insert(jsc.parallelize(records1, 1), commitTime).collect();
+      assertNoWriteErrors(writeStatuses1);
+
+      // assert RLI records for a base file from 1st commit
+      String finalCommitTime = commitTime;
+      Map<String, String> recordKeyToPartitionMapping1 = new HashMap<>();
+      Map<String, String> fileIdToFileNameMapping1 = new HashMap<>();
+      writeStatuses1.forEach(writeStatus -> {
+        assertEquals(writeStatus.getStat().getNumDeletes(), 0);
+        // Fetch record keys for all
+        try {
+          String writeStatFileId = writeStatus.getFileId();
+          if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) {
+            fileIdToFileNameMapping1.put(writeStatFileId, 
writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/")
 + 1));
+          }
+
+          // poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch 
MDT RLI records for inserts and deletes.
+          Iterator<HoodieRecord> rliRecordsItr = 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
+              writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), 
finalCommitTime, metaClient.getStorage());
+          while (rliRecordsItr.hasNext()) {
+            HoodieRecord rliRecord = rliRecordsItr.next();
+            String key = rliRecord.getRecordKey();
+            String partition = ((HoodieMetadataPayload) 
rliRecord.getData()).getRecordGlobalLocation().getPartitionPath();
+            recordKeyToPartitionMapping1.put(key, partition);
+          }
+        } catch (IOException e) {
+          throw new HoodieException("Should not have failed ", e);
+        }
+      });
+
+      Map<String, String> expectedRecordToPartitionMapping1 = new HashMap<>();
+      records1.forEach(record -> 
expectedRecordToPartitionMapping1.put(record.getRecordKey(), 
record.getPartitionPath()));
+
+      assertEquals(expectedRecordToPartitionMapping1, 
recordKeyToPartitionMapping1);
+
+      // lets update some records and assert RLI records.
+      commitTime = client.createNewInstantTime();
+      client.startCommitWithTime(commitTime);
+      List<HoodieRecord> deletes2 = 
dataGen.generateUniqueDeleteRecords(commitTime, 30);
+      List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates(commitTime, 
30);
+      List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime, 30);
+      List<HoodieRecord> records2 = new ArrayList<>();
+      records2.addAll(inserts2);
+      records2.addAll(updates2);
+      records2.addAll(deletes2);
+
+      List<WriteStatus> writeStatuses2 = 
client.upsert(jsc.parallelize(records2, 1), commitTime).collect();
+      assertNoWriteErrors(writeStatuses2);
+      assertRLIandSIRecordGenerationAPIs(inserts2, updates2, deletes2, 
writeStatuses2, commitTime, writeConfig);
+
+      // trigger 2nd commit.
+      commitTime = client.createNewInstantTime();
+      client.startCommitWithTime(commitTime);
+      String finalCommitTime3 = commitTime;
+      List<HoodieRecord> deletes3 = 
dataGen.generateUniqueDeleteRecords(commitTime, 30);
+      List<HoodieRecord> updates3 = dataGen.generateUniqueUpdates(commitTime, 
30);
+      List<HoodieRecord> inserts3 = dataGen.generateInserts(commitTime, 30);
+      List<HoodieRecord> records3 = new ArrayList<>();
+      records3.addAll(inserts3);
+      records3.addAll(updates3);
+      records3.addAll(deletes3);
+
+      List<WriteStatus> writeStatuses3 = 
client.upsert(jsc.parallelize(records3, 1), commitTime).collect();
+      assertNoWriteErrors(writeStatuses3);
+      assertRLIandSIRecordGenerationAPIs(inserts3, updates3, deletes3, 
writeStatuses3, finalCommitTime3, writeConfig);
+
+      // lets validate that if any log file contains inserts, fetching keys 
will fail.
+      HoodieWriteStat writeStat = writeStatuses3.get(1).getStat();
+      writeStat.setNumInserts(5);
+      HoodieCommitMetadata commitMetadata = 
CommitUtils.buildMetadata(Collections.singletonList(writeStat), 
Collections.emptyMap(),
+          Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), 
"commit");
+
+      try {
+        HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context, 
commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3);
+        fail("Should not have reached here");
+      } catch (Exception e) {
+        // no op
+      }
+
+      // trigger compaction
+      Option<String> compactionInstantOpt = 
client.scheduleCompaction(Option.empty());
+      assertTrue(compactionInstantOpt.isPresent());
+      HoodieWriteMetadata compactionWriteMetadata = 
client.compact(compactionInstantOpt.get());
+      HoodieCommitMetadata compactionCommitMetadata = (HoodieCommitMetadata) 
compactionWriteMetadata.getCommitMetadata().get();
+      // no RLI records should be generated for compaction operation.
+      
assertTrue(HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords(context, 
compactionCommitMetadata, writeConfig.getMetadataConfig(),
+          metaClient, writeConfig.getWritesFileIdEncoding(), 
compactionInstantOpt.get()).isEmpty());
+    }
+  }
+
+  private void assertRLIandSIRecordGenerationAPIs(List<HoodieRecord> inserts3, 
List<HoodieRecord> updates3, List<HoodieRecord> deletes3,
+                                                  List<WriteStatus> 
writeStatuses3, String finalCommitTime3, HoodieWriteConfig writeConfig) {
+    List<String> expectedRLIInserts = inserts3.stream().map(record -> 
record.getKey().getRecordKey()).collect(Collectors.toList());
+    List<String> expectedUpdates = updates3.stream().map(record -> 
record.getKey().getRecordKey()).collect(Collectors.toList());
+    List<String> expectedRLIDeletes = deletes3.stream().map(record -> 
record.getKey().getRecordKey()).collect(Collectors.toList());
+    List<String> expectedUpatesAndDeletes = new 
ArrayList<>(expectedRLIDeletes);
+    expectedUpatesAndDeletes.addAll(expectedUpdates);
+
+    // lets validate RLI record generation.
+    List<String> actualInserts = new ArrayList<>();
+    List<String> actualDeletes = new ArrayList<>();
+    List<String> actualUpdatesAndDeletes = new ArrayList<>();
+    generateRliRecordsAndAssert(writeStatuses3.stream().filter(writeStatus -> 
!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), 
writeStatus.getPartitionPath())))
+        .collect(Collectors.toList()), Collections.emptyMap(), 
finalCommitTime3, writeConfig, actualInserts, actualDeletes);
+
+    // lets also test HoodieTableMetadataUtil.getRecordKeys() for each 
individual log file touched as part of HoodieCommitMetadata.
+    // lets test only deletes and also test both validat and deleted keys for 
log files.
+    // we have disabled small file handling. And so, updates and deletes will 
definitely go into log files.
+    String latestCommitTimestamp = 
metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime();
+    Option<Schema> writerSchemaOpt = tryResolveSchemaForTable(metaClient);
+    List<String> finalActualDeletes = actualDeletes;
+    writeStatuses3.stream().filter(writeStatus -> 
FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), 
writeStatus.getPartitionPath())))
+        .forEach(writeStatus -> {
+          try {
+            // used for RLI
+            
finalActualDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/" 
+ writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
+                writeConfig.getMetadataConfig().getMaxReaderBufferSize(), 
latestCommitTimestamp, false, true));
+
+            // used in SI flow
+            
actualUpdatesAndDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + 
"/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
+                writeConfig.getMetadataConfig().getMaxReaderBufferSize(), 
latestCommitTimestamp, true, true));
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed w/ IOException ", e);
+          }
+        });
+
+    assertListEquality(expectedRLIInserts, actualInserts);
+    assertListEquality(expectedRLIDeletes, actualDeletes);
+    assertListEquality(expectedUpatesAndDeletes, actualUpdatesAndDeletes);
+    HoodieCommitMetadata commitMetadata = 
CommitUtils.buildMetadata(writeStatuses3.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()), Collections.emptyMap(),
+        Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), 
"commit");
+
+    // validate HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated for 
entire CommitMetadata which is used in SI code path.
+    List<String> updatedOrDeletedKeys =
+        new 
ArrayList<>(HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context, 
commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3));
+    List<String> expectedUpdatesOrDeletes = new ArrayList<>(expectedUpdates);
+    expectedUpdatesOrDeletes.addAll(expectedRLIDeletes);
+    assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys);
+  }
+
+  @Test
+  public void testReducedByKeysForRLIRecords() throws IOException {
+    HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+    cleanupClients();
+    initMetaClient(tableType);
+    cleanupTimelineService();
+    initTimelineService();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieWriteConfig writeConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      String commitTime = client.createNewInstantTime();
+      List<HoodieRecord> inserts = dataGen.generateInserts(commitTime, 100);
+      List<HoodieRecord> deletes = 
dataGen.generateUniqueDeleteRecords(commitTime, 20);
+      String randomFileId = UUID.randomUUID().toString() + "-0";
+      List<String> deletedRecordKeys = deletes.stream().map(record -> 
record.getRecordKey()).collect(Collectors.toList());
+      List<HoodieRecord> adjustedInserts = inserts.stream().filter(record -> 
!deletedRecordKeys.contains(record.getRecordKey())).collect(Collectors.toList());
+
+      List<HoodieRecord> insertRecords =
+          inserts.stream().map(record -> 
HoodieMetadataPayload.createRecordIndexUpdate(record.getRecordKey(), "abc", 
randomFileId, commitTime, 0))
+              .collect(Collectors.toList());
+      List<HoodieRecord> deleteRecords = inserts.stream().map(record -> 
HoodieMetadataPayload.createRecordIndexDelete(record.getRecordKey()))
+          .collect(Collectors.toList());
+
+      List<HoodieRecord> recordsToTest = new ArrayList<>();
+      recordsToTest.addAll(adjustedInserts);
+      recordsToTest.addAll(deleteRecords);
+      // happy paths. no dups. in and out are same.
+      List<HoodieRecord> actualRecords = 
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 
2).collectAsList();
+      assertHoodieRecordListEquality(actualRecords, recordsToTest);
+
+      // few records has both inserts and deletes.
+      recordsToTest = new ArrayList<>();
+      recordsToTest.addAll(insertRecords);
+      recordsToTest.addAll(deleteRecords);
+      actualRecords = 
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 
2).collectAsList();
+      List<HoodieRecord> expectedList = new ArrayList<>();
+      expectedList.addAll(insertRecords);
+      assertHoodieRecordListEquality(actualRecords, expectedList);
+
+      // few deletes are duplicates. we are allowed to have duplicate deletes.
+      recordsToTest = new ArrayList<>();
+      recordsToTest.addAll(adjustedInserts);
+      recordsToTest.addAll(deleteRecords);
+      recordsToTest.addAll(deleteRecords.subList(0, 10));
+      actualRecords = 
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 
2).collectAsList();
+      expectedList = new ArrayList<>();
+      expectedList.addAll(adjustedInserts);
+      expectedList.addAll(deleteRecords);
+      assertHoodieRecordListEquality(actualRecords, expectedList);
+
+      // test failure case. same record having 2 inserts should fail.
+      recordsToTest = new ArrayList<>();
+      recordsToTest.addAll(adjustedInserts);
+      recordsToTest.addAll(adjustedInserts.subList(0, 5));
+      try {
+        
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 
2).collectAsList();
+        fail("Should not have reached here");
+      } catch (Exception e) {
+        // expected. no-op
+        assertTrue(e.getCause() instanceof HoodieIOException);
+      }
+    }
+  }
+
+  private void assertHoodieRecordListEquality(List<HoodieRecord> actualList, 
List<HoodieRecord> expectedList) {
+    assertEquals(expectedList.size(), actualList.size());
+    List<String> expectedInsertRecordKeys = 
expectedList.stream().filter(record -> !(record.getData() instanceof 
EmptyHoodieRecordPayload))
+        .map(record -> record.getRecordKey()).collect(Collectors.toList());
+    List<String> expectedDeletedRecordKeys = 
expectedList.stream().filter(record -> (record.getData() instanceof 
EmptyHoodieRecordPayload))
+        .map(record -> record.getRecordKey()).collect(Collectors.toList());
+
+    List<String> actualInsertRecordKeys = actualList.stream().filter(record -> 
!(record.getData() instanceof EmptyHoodieRecordPayload))
+        .map(record -> record.getRecordKey()).collect(Collectors.toList());
+    List<String> actualDeletedRecordKeys = actualList.stream().filter(record 
-> (record.getData() instanceof EmptyHoodieRecordPayload))
+        .map(record -> record.getRecordKey()).collect(Collectors.toList());
+
+    assertListEquality(expectedInsertRecordKeys, actualInsertRecordKeys);
+    assertListEquality(expectedDeletedRecordKeys, actualDeletedRecordKeys);
+  }
+
+  private void assertListEquality(List<String> list1, List<String> list2) {
+    Collections.sort(list1);
+    Collections.sort(list2);
+    assertEquals(list1, list2);
+  }
+
+  private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient 
dataTableMetaClient) {
+    if 
(dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
 == 0) {
+      return Option.empty();
+    }
+
+    try {
+      TableSchemaResolver schemaResolver = new 
TableSchemaResolver(dataTableMetaClient);
+      return Option.of(schemaResolver.getTableAvroSchema());
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get latest columns for " + 
dataTableMetaClient.getBasePath(), e);
+    }
+  }
+
+  private void generateRliRecordsAndAssert(List<WriteStatus> writeStatuses, 
Map<String, String> fileIdToFileNameMapping, String commitTime,
+                                           HoodieWriteConfig writeConfig, 
List<String> actualInserts,
+                                           List<String> actualDeletes) {
+    writeStatuses.forEach(writeStatus -> {
+      if 
(!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), 
writeStatus.getPartitionPath()))) {
+        // Fetch record keys for all
+        try {
+          String writeStatFileId = writeStatus.getFileId();
+          if (!fileIdToFileNameMapping.isEmpty()) {
+            assertEquals(writeStatus.getStat().getPrevBaseFile(), 
fileIdToFileNameMapping.get(writeStatFileId));
+          }
+
+          Iterator<HoodieRecord> rliRecordsItr = 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
 writeStatus.getStat(),
+              writeConfig.getWritesFileIdEncoding(), commitTime, 
metaClient.getStorage());
+          while (rliRecordsItr.hasNext()) {
+            HoodieRecord rliRecord = rliRecordsItr.next();
+            String key = rliRecord.getRecordKey();
+            if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) {
+              actualDeletes.add(key);
+            } else {
+              actualInserts.add(key);
+            }
+          }
+        } catch (IOException e) {
+          throw new HoodieException("Should not have failed ", e);
+        }
+      }
+    });
+  }
+
+  private void parseRecordKeysFromBaseFiles(List<WriteStatus> writeStatuses, 
Map<String, String> fileIdToFileNameMapping, String commitTime,
+                                            HoodieWriteConfig writeConfig, 
List<String> actualInserts,
+                                            List<String> actualDeletes, 
List<String> actualUpdates) {
+    writeStatuses.forEach(writeStatus -> {
+      if 
(!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), 
writeStatus.getPartitionPath()))) {
+        // Fetch record keys for all
+        try {
+          String writeStatFileId = writeStatus.getFileId();
+          if (!fileIdToFileNameMapping.isEmpty()) {
+            assertEquals(writeStatus.getStat().getPrevBaseFile(), 
fileIdToFileNameMapping.get(writeStatFileId));
+          }
+
+          String partition = writeStatus.getStat().getPartitionPath();
+          String latestFileName = 
FSUtils.getFileNameFromPath(writeStatus.getStat().getPath());
+
+          Set<BaseFileRecordParsingUtils.RecordStatus> recordStatusSet = new 
HashSet<>();
+          recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.INSERT);
+          recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.UPDATE);
+          recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.DELETE);
+
+          Map<BaseFileRecordParsingUtils.RecordStatus, List<String>> 
recordKeyMappings = 
BaseFileRecordParsingUtils.getRecordKeyStatuses(metaClient.getBasePath().toString(),
 partition, latestFileName,
+              writeStatus.getStat().getPrevBaseFile(), storage, 
recordStatusSet);
+          if 
(recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.INSERT)) 
{
+            
actualInserts.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.INSERT));
+          }
+          if 
(recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.UPDATE)) 
{
+            
actualUpdates.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.UPDATE));
+          }
+          if 
(recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.DELETE)) 
{
+            
actualDeletes.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.DELETE));
+          }
+        } catch (IOException e) {
+          throw new HoodieException("Should not have failed ", e);
+        }
+      }
+    });
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index a290b1f3e96..ce7b1c26c9c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -431,7 +431,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       });
       commitMeta = generateCommitMetadata(instantTime, partToFileIds);
       metadataWriter.performTableServices(Option.of(instantTime));
-      metadataWriter.updateFromWriteStatuses(commitMeta, 
context.emptyHoodieData(), instantTime);
+      metadataWriter.update(commitMeta, instantTime);
       metaClient.getActiveTimeline().saveAsComplete(
           INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, instantTime),
           serializeCommitMetadata(metaClient.getCommitMetadataSerDe(), 
commitMeta));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index ef28980d9cf..73b9d7e8be2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -135,7 +135,7 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
     List<WriteStatus> writeStatuses = writeData(insertTime, 100, false);
     Assertions.assertEquals(200, readTableTotalRecordsNum());
     // commit the write. The records should be visible now even though the 
compaction does not complete.
-    client.commitStats(insertTime, context().parallelize(writeStatuses, 1), 
writeStatuses.stream().map(WriteStatus::getStat)
+    client.commitStats(insertTime, 
writeStatuses.stream().map(WriteStatus::getStat)
         .collect(Collectors.toList()), Option.empty(), 
metaClient.getCommitActionType());
     Assertions.assertEquals(300, readTableTotalRecordsNum());
     // after the compaction, total records should remain the same
@@ -207,7 +207,7 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
     org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
     if (doCommit) {
       List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-      boolean committed = client.commitStats(instant, 
context().parallelize(writeStatuses, 1), writeStats, Option.empty(), 
metaClient.getCommitActionType());
+      boolean committed = client.commitStats(instant, writeStats, 
Option.empty(), metaClient.getCommitActionType());
       Assertions.assertTrue(committed);
     }
     metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
index b8f55c55a72..704d6e8420b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
@@ -140,7 +140,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     // step to commit the 1st txn
     client1.commitStats(
         insertTime1,
-        context().parallelize(writeStatuses1, 1),
         
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(),
         metaClient.getCommitActionType());
@@ -148,7 +147,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     // step to commit the 2nd txn
     client2.commitStats(
         insertTime2,
-        context().parallelize(writeStatuses2, 1),
         
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(),
         metaClient.getCommitActionType());
@@ -185,7 +183,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     // step to commit the 1st txn
     client1.commitStats(
         insertTime1,
-        context().parallelize(writeStatuses1, 1),
         
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(),
         metaClient.getCommitActionType());
@@ -199,7 +196,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     List<WriteStatus> writeStatuses3 = writeData(client1, insertTime3, 
dataset3, false, WriteOperationType.INSERT);
     client1.commitStats(
         insertTime3,
-        context().parallelize(writeStatuses3, 1),
         
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(),
         metaClient.getCommitActionType());
@@ -233,7 +229,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
       List<WriteStatus> writeStatuses0 = writeData(client0, insertTime0, 
dataset0, false, WriteOperationType.BULK_INSERT, true);
       client0.commitStats(
           insertTime0,
-          context().parallelize(writeStatuses0, 1),
           
writeStatuses0.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
           Option.empty(),
           metaClient.getCommitActionType());
@@ -273,7 +268,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     // step to commit the 1st txn
     client1.commitStats(
         insertTime1,
-        context().parallelize(writeStatuses1, 1),
         
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(),
         metaClient.getCommitActionType());
@@ -281,7 +275,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     // step to commit the 2nd txn
     client2.commitStats(
         insertTime2,
-        context().parallelize(writeStatuses2, 1),
         
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(),
         metaClient.getCommitActionType());
@@ -330,7 +323,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     // step to commit the 1st txn
     client1.commitStats(
         insertTime1,
-        context().parallelize(writeStatuses1, 1),
         
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(),
         metaClient.getCommitActionType());
@@ -339,7 +331,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     assertThrows(HoodieWriteConflictException.class, () -> {
       client2.commitStats(
           insertTime2,
-          context().parallelize(writeStatuses2, 1),
           
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
           Option.empty(),
           metaClient.getCommitActionType());
@@ -552,7 +543,7 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
     if (doCommit) {
       List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-      boolean committed = client.commitStats(instant, 
context().parallelize(writeStatuses, 1), writeStats, Option.empty(), 
metaClient.getCommitActionType());
+      boolean committed = client.commitStats(instant, writeStats, 
Option.empty(), metaClient.getCommitActionType());
       Assertions.assertTrue(committed);
     }
     metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
index 8d12c7da48d..0da2af63725 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
@@ -211,7 +211,7 @@ public class HoodieCleanerTestBase extends 
HoodieClientTestBase {
     HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime, 
partToFileIds);
     try (HoodieTableMetadataWriter metadataWriter = getMetadataWriter(config)) 
{
       metadataWriter.performTableServices(Option.of(instantTime));
-      metadataWriter.updateFromWriteStatuses(commitMeta, 
context.emptyHoodieData(), instantTime);
+      metadataWriter.update(commitMeta, instantTime);
       metaClient.getActiveTimeline().saveAsComplete(
           INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, instantTime),
           serializeCommitMetadata(metaClient.getCommitMetadataSerDe(), 
commitMeta));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 3c98a510317..6d7ca6d5182 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -159,6 +159,9 @@ public class HoodieWriteStat implements Serializable {
   @Nullable
   private Long maxEventTime;
 
+  @Nullable
+  private String prevBaseFile;
+
   @Nullable
   private RuntimeStats runtimeStats;
 
@@ -327,6 +330,14 @@ public class HoodieWriteStat implements Serializable {
     this.fileSizeInBytes = fileSizeInBytes;
   }
 
+  public String getPrevBaseFile() {
+    return prevBaseFile;
+  }
+
+  public void setPrevBaseFile(String prevBaseFile) {
+    this.prevBaseFile = prevBaseFile;
+  }
+
   public Long getMinEventTime() {
     return minEventTime;
   }
@@ -370,9 +381,9 @@ public class HoodieWriteStat implements Serializable {
   @Override
   public String toString() {
     return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path + 
'\'' + ", prevCommit='" + prevCommit
-        + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + 
", numUpdateWrites=" + numUpdateWrites
-        + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + 
totalWriteErrors + ", tempPath='" + tempPath
-        + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats)
+        + '\'' + ", prevBaseFile=" + prevBaseFile + '\'' + ", numWrites=" + 
numWrites + ", numDeletes=" + numDeletes
+        + ", numUpdateWrites=" + numUpdateWrites + ", totalWriteBytes=" + 
totalWriteBytes + ", totalWriteErrors="
+        + totalWriteErrors + ", tempPath='" + tempPath + '\'' + ", cdcStats='" 
+ JsonUtils.toString(cdcStats)
         + '\'' + ", partitionPath='" + partitionPath + '\'' + ", 
totalLogRecords=" + totalLogRecords
         + ", totalLogFilesCompacted=" + totalLogFilesCompacted + ", 
totalLogSizeCompacted=" + totalLogSizeCompacted
         + ", totalUpdatedRecordsCompacted=" + totalUpdatedRecordsCompacted + 
", totalLogBlocks=" + totalLogBlocks
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 8b8d43449c6..c23c5f42f19 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -42,16 +43,19 @@ import java.util.stream.Collectors;
 public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner {
 
   private final LogRecordScannerCallback callback;
+  private final RecordDeletionCallback recordDeletionCallback;
 
   private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, boolean 
reverseReader, int bufferSize,
-                                         LogRecordScannerCallback callback, 
Option<InstantRange> instantRange, InternalSchema internalSchema,
+                                         LogRecordScannerCallback callback, 
RecordDeletionCallback recordDeletionCallback,
+                                         Option<InstantRange> instantRange, 
InternalSchema internalSchema,
                                          boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
                                          Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
     super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, 
reverseReader, bufferSize, instantRange,
         false, true, Option.empty(), internalSchema, Option.empty(), 
enableOptimizedLogBlocksScan, recordMerger,
          hoodieTableMetaClientOption);
     this.callback = callback;
+    this.recordDeletionCallback = recordDeletionCallback;
   }
 
   /**
@@ -78,12 +82,16 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
     //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
     //       it since these records will be put into queue of 
BoundedInMemoryExecutor.
     // Just call callback without merging
-    callback.apply(hoodieRecord.copy());
+    if (callback != null) {
+      callback.apply(hoodieRecord.copy());
+    }
   }
 
   @Override
   protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
-    // no - op
+    if (recordDeletionCallback != null) {
+      recordDeletionCallback.apply(deleteRecord.getHoodieKey());
+    }
   }
 
   /**
@@ -95,6 +103,14 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
     void apply(HoodieRecord<?> record) throws Exception;
   }
 
+  /**
+   * A callback for log record scanner to consume deleted HoodieKeys.
+   */
+  @FunctionalInterface
+  public interface RecordDeletionCallback {
+    void apply(HoodieKey deletedKey);
+  }
+
   /**
    * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
    */
@@ -110,6 +126,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
     private Option<InstantRange> instantRange = Option.empty();
     // specific configurations
     private LogRecordScannerCallback callback;
+    private RecordDeletionCallback recordDeletionCallback;
     private boolean enableOptimizedLogBlocksScan;
     private HoodieRecordMerger recordMerger = 
HoodiePreCombineAvroRecordMerger.INSTANCE;
     private HoodieTableMetaClient hoodieTableMetaClient;
@@ -173,6 +190,11 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
       return this;
     }
 
+    public Builder withRecordDeletionCallback(RecordDeletionCallback 
recordDeletionCallback) {
+      this.recordDeletionCallback = recordDeletionCallback;
+      return this;
+    }
+
     @Override
     public Builder withOptimizedLogBlocksScan(boolean 
enableOptimizedLogBlocksScan) {
       this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
@@ -197,7 +219,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
       ValidationUtils.checkArgument(recordMerger != null);
 
       return new HoodieUnMergedLogRecordScanner(storage, basePath, 
logFilePaths, readerSchema,
-          latestInstantTime, reverseReader, bufferSize, callback, instantRange,
+          latestInstantTime, reverseReader, bufferSize, callback, 
recordDeletionCallback, instantRange,
           internalSchema, enableOptimizedLogBlocksScan, recordMerger, 
Option.ofNullable(hoodieTableMetaClient));
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
new file mode 100644
index 00000000000..ac2739237f2
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+
+public class BaseFileRecordParsingUtils {
+
+  /**
+   * Generates RLI Metadata records for base files.
+   * If base file is a added to a new file group, all record keys are treated 
as inserts.
+   * If a base file is added to an existing file group, we read previous base 
file in addition to the latest base file of interest. Find deleted records and 
generate RLI Metadata records
+   * for the same in addition to new insert records.
+   *
+   * @param basePath             base path of the table.
+   * @param writeStat            {@link HoodieWriteStat} of interest.
+   * @param writesFileIdEncoding fileID encoding for the table.
+   * @param instantTime          instant time of interest.
+   * @param storage              instance of {@link HoodieStorage}.
+   * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
+   * @throws IOException
+   */
+  public static Iterator<HoodieRecord> 
generateRLIMetadataHoodieRecordsForBaseFile(String basePath,
+                                                                               
    HoodieWriteStat writeStat,
+                                                                               
    Integer writesFileIdEncoding,
+                                                                               
    String instantTime,
+                                                                               
    HoodieStorage storage) throws IOException {
+    String partition = writeStat.getPartitionPath();
+    String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath());
+    String fileId = FSUtils.getFileId(latestFileName);
+
+    Set<RecordStatus> recordStatuses = new HashSet<>();
+    recordStatuses.add(RecordStatus.INSERT);
+    recordStatuses.add(RecordStatus.DELETE);
+    // for RLI, we are only interested in INSERTS and DELETES
+    Map<RecordStatus, List<String>> recordStatusListMap = 
getRecordKeyStatuses(basePath, writeStat.getPartitionPath(), latestFileName, 
writeStat.getPrevBaseFile(), storage,
+        recordStatuses);
+    List<HoodieRecord> hoodieRecords = new ArrayList<>();
+    if (recordStatusListMap.containsKey(RecordStatus.INSERT)) {
+      
hoodieRecords.addAll(recordStatusListMap.get(RecordStatus.INSERT).stream()
+          .map(recordKey -> (HoodieRecord) 
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId,
+              instantTime, writesFileIdEncoding)).collect(toList()));
+    }
+
+    if (recordStatusListMap.containsKey(RecordStatus.DELETE)) {
+      
hoodieRecords.addAll(recordStatusListMap.get(RecordStatus.DELETE).stream()
+          .map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()));
+    }
+
+    return hoodieRecords.iterator();
+  }
+
+  /**
+   * Fetch list of record keys deleted or updated in file referenced in the 
{@link HoodieWriteStat} passed.
+   *
+   * @param basePath  base path of the table.
+   * @param writeStat {@link HoodieWriteStat} instance of interest.
+   * @param storage   {@link HoodieStorage} instance of interest.
+   * @return list of record keys deleted or updated.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public static List<String> getRecordKeysDeletedOrUpdated(String basePath,
+                                                           HoodieWriteStat 
writeStat,
+                                                           HoodieStorage 
storage) throws IOException {
+    String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath());
+    Set<RecordStatus> recordStatuses = new HashSet<>();
+    recordStatuses.add(RecordStatus.UPDATE);
+    recordStatuses.add(RecordStatus.DELETE);
+    // for secondary index, we are interested in UPDATES and DELETES.
+    return getRecordKeyStatuses(basePath, writeStat.getPartitionPath(), 
latestFileName, writeStat.getPrevBaseFile(), storage,
+        recordStatuses).values().stream().flatMap((Function<List<String>, 
Stream<String>>) Collection::stream).collect(toList());
+  }
+
+  /**
+   * Fetch list of record keys deleted or updated in file referenced in the 
{@link HoodieWriteStat} passed.
+   *
+   * @param basePath base path of the table.
+   * @param storage  {@link HoodieStorage} instance of interest.
+   * @return list of record keys deleted or updated.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public static Map<RecordStatus, List<String>> getRecordKeyStatuses(String 
basePath,
+                                                              String partition,
+                                                              String 
latestFileName,
+                                                              String 
prevFileName,
+                                                              HoodieStorage 
storage,
+                                                              
Set<RecordStatus> recordStatusesOfInterest) throws IOException {
+    Set<String> recordKeysFromLatestBaseFile = 
getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName);
+    if (prevFileName == null) {
+      if (recordStatusesOfInterest.contains(RecordStatus.INSERT)) {
+        return Collections.singletonMap(RecordStatus.INSERT, new 
ArrayList<>(recordKeysFromLatestBaseFile));
+      } else {
+        // if this is a new base file for a new file group, everything is an 
insert.
+        return Collections.emptyMap();
+      }
+    } else {
+      // read from previous base file and find difference to also generate 
delete records.
+      // we will return updates and deletes from this code block
+      Set<String> recordKeysFromPreviousBaseFile = 
getRecordKeysFromBaseFile(storage, basePath, partition, prevFileName);
+      Map<RecordStatus, List<String>> toReturn = new 
HashMap<>(recordStatusesOfInterest.size());
+      if (recordStatusesOfInterest.contains(RecordStatus.DELETE)) {
+        toReturn.put(RecordStatus.DELETE, 
recordKeysFromPreviousBaseFile.stream()
+            .filter(recordKey -> {
+              // deleted record
+              return !recordKeysFromLatestBaseFile.contains(recordKey);
+            }).collect(toList()));
+      }
+
+      List<String> updates = new ArrayList<>();
+      List<String> inserts = new ArrayList<>();
+      // do one pass and collect list of inserts and updates.
+      recordKeysFromLatestBaseFile.stream().forEach(recordKey -> {
+        if (recordKeysFromPreviousBaseFile.contains(recordKey)) {
+          updates.add(recordKey);
+        } else {
+          inserts.add(recordKey);
+        }
+      });
+      if (recordStatusesOfInterest.contains(RecordStatus.UPDATE)) {
+        toReturn.put(RecordStatus.UPDATE, updates);
+      }
+      if (recordStatusesOfInterest.contains(RecordStatus.INSERT)) {
+        toReturn.put(RecordStatus.INSERT, inserts);
+      }
+      return toReturn;
+    }
+  }
+
+  private static Set<String> getRecordKeysFromBaseFile(HoodieStorage storage, 
String basePath, String partition, String fileName) throws IOException {
+    StoragePath dataFilePath = new StoragePath(basePath, 
StringUtils.isNullOrEmpty(partition) ? fileName : (partition + Path.SEPARATOR) 
+ fileName);
+    FileFormatUtils fileFormatUtils = 
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.PARQUET);
+    return fileFormatUtils.readRowKeys(storage, dataFilePath);
+  }
+
+  public enum RecordStatus {
+    INSERT,
+    UPDATE,
+    DELETE
+  }
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 6f03b9e8209..ab63c153250 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -46,6 +46,9 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -54,6 +57,7 @@ import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -61,6 +65,7 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -135,6 +140,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
@@ -364,6 +370,7 @@ public class HoodieTableMetadataUtil {
                                                                                
boolean isColumnStatsIndexEnabled,
                                                                                
int columnStatsIndexParallelism,
                                                                                
List<String> targetColumnsForColumnStatsIndex,
+                                                                               
Integer writesFileIdEncoding,
                                                                                
HoodieMetadataConfig metadataConfig) {
     final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new 
HashMap<>();
     final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
context.parallelize(
@@ -385,6 +392,10 @@ public class HoodieTableMetadataUtil {
       final HoodieData<HoodieRecord> partitionStatsRDD = 
convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient, 
metadataConfig);
       
partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(),
 partitionStatsRDD);
     }
+    if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX)) {
+      
partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig,
+          dataMetaClient, writesFileIdEncoding, instantTime));
+    }
     return partitionToRecordsMap;
   }
 
@@ -757,6 +768,163 @@ public class HoodieTableMetadataUtil {
         });
   }
 
+  @VisibleForTesting
+  public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty() || commitMetadata.getOperationType() == 
WriteOperationType.COMPACT) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      HoodieFileFormat baseFileFormat = 
dataTableMetaClient.getTableConfig().getBaseFileFormat();
+      // RLI cannot support logs having inserts with current offering. So, 
lets validate that.
+      if (allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+      })) {
+        throw new HoodieIOException("RLI cannot support logs having inserts 
with current offering. Would recommend disabling Record Level Index");
+      }
+
+      // we might need to set some additional variables if we need to process 
log files.
+      // for RLI and MOR table, we only care about log files if they contain 
any deletes. If not, all entries in logs are considered as updates, for which
+      // we do not need to generate new RLI record.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+            // handle base files
+            if 
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else if (FSUtils.isLogFile(fullFilePath)) {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                Set<String> deletedRecordKeys = 
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime, false, 
true);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            } else {
+              throw new HoodieIOException("Unsupported file type " + 
fullFilePath.toString() + " while generating MDT records");
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return reduceByKeys(recordIndexRecords, parallelism);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate RLI records for metadata 
table", e);
+    }
+  }
+
+  /**
+   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
+   * So, this method performs reduce by key to ignore the deleted entry.
+   * @param recordIndexRecords hoodie records after rli index lookup.
+   * @param parallelism parallelism to use.
+   * @return
+   */
+  @VisibleForTesting
+  public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> 
recordIndexRecords, int parallelism) {
+    return recordIndexRecords.mapToPair(
+            (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>) 
t -> Pair.of(t.getKey(), t))
+        .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, 
HoodieRecord>) (record1, record2) -> {
+          boolean isRecord1Deleted = record1.getData() instanceof 
EmptyHoodieRecordPayload;
+          boolean isRecord2Deleted = record2.getData() instanceof 
EmptyHoodieRecordPayload;
+          if (isRecord1Deleted && !isRecord2Deleted) {
+            return record2;
+          } else if (!isRecord1Deleted && isRecord2Deleted) {
+            return record1;
+          } else if (isRecord1Deleted && isRecord2Deleted) {
+            // let's delete just 1 of them
+            return record1;
+          } else {
+            throw new HoodieIOException("Two HoodieRecord updates to RLI is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+                + record1.getData().toString() + ", record 2 : " + 
record2.getData().toString());
+          }
+        }, parallelism).values();
+  }
+
+  @VisibleForTesting
+  public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext 
engineContext,
+                                                           
HoodieCommitMetadata commitMetadata,
+                                                           
HoodieMetadataConfig metadataConfig,
+                                                           
HoodieTableMetaClient dataTableMetaClient,
+                                                           String instantTime) 
{
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      HoodieFileFormat baseFileFormat = 
dataTableMetaClient.getTableConfig().getBaseFileFormat();
+      // SI cannot support logs having inserts with current offering. So, lets 
validate that.
+      if (allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+      })) {
+        throw new HoodieIOException("Secondary index cannot support logs 
having inserts with current offering. Can you drop secondary index.");
+      }
+
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName);
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFiles) {
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      return engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+            // handle base files
+            if 
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, 
storage).iterator();
+            } else if (FSUtils.isLogFile(fullFilePath)) {
+              // for logs, every entry is either an update or a delete
+              return getRecordKeys(fullFilePath.toString(), 
dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime, true, 
true)
+                  .iterator();
+            } else {
+              throw new HoodieIOException("Found unsupported file type " + 
fullFilePath.toString() + ", while generating MDT records");
+            }
+          }).collectAsList();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to fetch deleted record keys while 
preparing MDT records", e);
+    }
+  }
+
   private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient 
dataTableMetaClient, String instantTime,
                                                     Map<String, Map<String, 
Long>> partitionToFilesMap) {
     InstantGenerator factory = dataTableMetaClient.getInstantGenerator();
@@ -1309,6 +1477,35 @@ public class HoodieTableMetadataUtil {
     return Collections.emptyList();
   }
 
+  @VisibleForTesting
+  public static Set<String> getRecordKeys(String filePath, 
HoodieTableMetaClient datasetMetaClient,
+                                          Option<Schema> writerSchemaOpt, int 
maxBufferSize,
+                                          String latestCommitTimestamp, 
boolean includeValidKeys,
+                                          boolean includeDeletedKeys) throws 
IOException {
+    if (writerSchemaOpt.isPresent()) {
+      // read log file records without merging
+      Set<String> allRecordKeys = new HashSet<>();
+      HoodieUnMergedLogRecordScanner.Builder builder = 
HoodieUnMergedLogRecordScanner.newBuilder()
+          .withStorage(datasetMetaClient.getStorage())
+          .withBasePath(datasetMetaClient.getBasePath())
+          .withLogFilePaths(Collections.singletonList(filePath))
+          .withBufferSize(maxBufferSize)
+          .withLatestInstantTime(latestCommitTimestamp)
+          .withReaderSchema(writerSchemaOpt.get())
+          .withTableMetaClient(datasetMetaClient);
+      if (includeValidKeys) {
+        builder.withLogRecordScannerCallback(record -> 
allRecordKeys.add(record.getRecordKey()));
+      }
+      if (includeDeletedKeys) {
+        builder.withRecordDeletionCallback(deletedKey -> 
allRecordKeys.add(deletedKey.getRecordKey()));
+      }
+      HoodieUnMergedLogRecordScanner scanner = builder.build();
+      scanner.scan();
+      return allRecordKeys;
+    }
+    return Collections.emptySet();
+  }
+
   /**
    * Does an upcast for {@link BigDecimal} instance to align it with 
scale/precision expected by
    * the {@link LogicalTypes.Decimal} Avro logical type
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
index a99e4848299..54d6b7a7ab7 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
@@ -53,6 +53,12 @@ public class TestHoodieWriteStat {
     writeStat.setPath(basePath, finalizeFilePath);
     assertEquals(finalizeFilePath, new StoragePath(basePath, 
writeStat.getPath()));
 
+    // test prev base file
+    StoragePath prevBaseFilePath = new StoragePath(partitionPath, 
FSUtils.makeBaseFileName(instantTime, "2-0-3", fileName,
+        HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension()));
+    writeStat.setPrevBaseFile(prevBaseFilePath.toString());
+    assertEquals(prevBaseFilePath.toString(), writeStat.getPrevBaseFile());
+
     // test for null tempFilePath
     writeStat = new HoodieWriteStat();
     writeStat.setPath(basePath, finalizeFilePath);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index e773f3ed117..f28f811ba59 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.clustering;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.TableServiceType;
@@ -218,8 +217,7 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
     }
     // commit the clustering
     this.table.getMetaClient().reloadActiveTimeline();
-    this.writeClient.completeTableService(
-        TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), 
table, instant, 
Option.of(HoodieListData.lazy(writeMetadata.getWriteStatuses())));
+    this.writeClient.completeTableService(TableServiceType.CLUSTER, 
writeMetadata.getCommitMetadata().get(), table, instant);
 
     clusteringMetrics.updateCommitMetrics(instant, 
writeMetadata.getCommitMetadata().get());
     // whether to clean up the input base parquet files used for clustering
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index 1c9e2bb7579..d9db9cd51d1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -84,7 +84,7 @@ public class DataSourceInternalWriterHelper {
   public void commit(List<WriteStatus> writeStatuses) {
     try {
       List<HoodieWriteStat> writeStatList = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-      writeClient.commitStats(instantTime, 
writeClient.getEngineContext().parallelize(writeStatuses), writeStatList, 
Option.of(extraMetadata),
+      writeClient.commitStats(instantTime, writeStatList, 
Option.of(extraMetadata),
           CommitUtils.getCommitActionType(operationType, 
metaClient.getTableType()));
     } catch (Exception ioe) {
       throw new HoodieException(ioe.getMessage(), ioe);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
index 7f2663d1051..a96dc752ee6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
@@ -305,7 +305,7 @@ public class TestSparkConsistentBucketClustering extends 
HoodieSparkClientTestHa
     List<WriteStatus> writeStatues = writeData(writeTime, 2000, false);
     // Cannot schedule clustering if there is in-flight writer
     
Assertions.assertFalse(writeClient.scheduleClustering(Option.empty()).isPresent());
-    Assertions.assertTrue(writeClient.commitStats(writeTime, 
context.parallelize(writeStatues, 1), 
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+    Assertions.assertTrue(writeClient.commitStats(writeTime, 
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
         Option.empty(), metaClient.getCommitActionType()));
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
@@ -341,7 +341,7 @@ public class TestSparkConsistentBucketClustering extends 
HoodieSparkClientTestHa
     List<WriteStatus> writeStatues = writeClient.upsert(writeRecords, 
commitTime).collect();
     org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
     if (doCommit) {
-      Assertions.assertTrue(writeClient.commitStats(commitTime, 
context.parallelize(writeStatues, 1), 
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+      Assertions.assertTrue(writeClient.commitStats(commitTime, 
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
           Option.empty(), metaClient.getCommitActionType()));
     }
     metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
index bc7bc5edfc5..96b4d425102 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
@@ -157,7 +157,7 @@ public class TestSparkSortAndSizeClustering extends 
HoodieSparkClientTestHarness
     org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
 
     if (doCommit) {
-      Assertions.assertTrue(writeClient.commitStats(commitTime, 
context.parallelize(writeStatues, 1), 
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+      Assertions.assertTrue(writeClient.commitStats(commitTime, 
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
           Option.empty(), metaClient.getCommitActionType()));
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index e69c866ed18..15b52a68b8e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -322,9 +322,9 @@ class RecordLevelIndexTestBase extends 
HoodieSparkClientTestBase {
       writeConfig.getRecordIndexMinFileGroupCount, 
writeConfig.getRecordIndexMaxFileGroupCount,
       writeConfig.getRecordIndexGrowthFactor, 
writeConfig.getRecordIndexMaxFileGroupSizeBytes)
     assertEquals(estimatedFileGroupCount, 
getFileGroupCountForRecordIndex(writeConfig))
-    val prevDf = mergedDfList.last.drop("tip_history")
+    val prevDf = mergedDfList.last.drop("tip_history", "_hoodie_is_deleted")
     val nonMatchingRecords = readDf.drop("_hoodie_commit_time", 
"_hoodie_commit_seqno", "_hoodie_record_key",
-      "_hoodie_partition_path", "_hoodie_file_name", "tip_history")
+      "_hoodie_partition_path", "_hoodie_file_name", "tip_history", 
"_hoodie_is_deleted")
       .join(prevDf, prevDf.columns, "leftanti")
     assertEquals(0, nonMatchingRecords.count())
     assertEquals(readDf.count(), prevDf.count())
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 14d56db3160..5693f1804e1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -71,7 +71,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
     val dataGen1 = HoodieTestDataGenerator.createTestGeneratorFirstPartition()
     val dataGen2 = HoodieTestDataGenerator.createTestGeneratorSecondPartition()
 
-    // batch1 inserts
+    // batch1 inserts (5 records)
     val instantTime1 = getNewInstantTime()
     val latestBatch = recordsToStrings(dataGen1.generateInserts(instantTime1, 
5)).asScala.toSeq
     var operation = INSERT_OPERATION_OPT_VAL
@@ -111,7 +111,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
       (HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true")
 
     val instantTime3 = getNewInstantTime()
-    // batch3. updates to partition2
+    // batch3. update 2 records from newly inserted records from commit 2 to 
partition2
     val latestBatch3 = 
recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq
     val latestBatchDf3 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1))
     latestBatchDf3.cache()
@@ -240,7 +240,29 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
       .save(basePath)
     val prevDf = mergedDfList.last
     mergedDfList = mergedDfList :+ prevDf.except(deleteDf)
-    validateDataAndRecordIndices(hudiOpts)
+    validateDataAndRecordIndices(hudiOpts, deleteDf)
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType): 
Unit = {
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name())
+    val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    insertDf.cache()
+
+    val deleteBatch = 
recordsToStrings(dataGen.generateUniqueDeleteRecords(getNewInstantTime, 
1)).asScala
+    val deleteDf = 
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
+    deleteDf.cache()
+    val recordKeyToDelete = 
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
+    deleteDf.write.format("org.apache.hudi")
+      .options(hudiOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val prevDf = mergedDfList.last
+    mergedDfList = mergedDfList :+ prevDf.filter(row => 
row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete)
+    validateDataAndRecordIndices(hudiOpts, deleteDf)
   }
 
   @ParameterizedTest
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
index 2daafb37a1d..90aa040dac9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
@@ -96,7 +96,7 @@ public class HoodieOfflineJobTestBase extends 
UtilitiesTestBase {
     org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
     if (doCommit) {
       List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-      boolean committed = client.commitStats(instant, 
context.parallelize(writeStatuses, 1), writeStats, Option.empty(), 
metaClient.getCommitActionType());
+      boolean committed = client.commitStats(instant, writeStats, 
Option.empty(), metaClient.getCommitActionType());
       Assertions.assertTrue(committed);
     }
     metaClient = HoodieTableMetaClient.reload(metaClient);


Reply via email to