This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e527404547 [HUDI-9405] Adding new writer apis and impl to Metadata 
writer to support streaming writes (#13286)
0e527404547 is described below

commit 0e52740454768f2e81449c6472d71fbf9496857f
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jun 13 15:43:57 2025 +0530

    [HUDI-9405] Adding new writer apis and impl to Metadata writer to support 
streaming writes (#13286)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 216 ++++++++++++++++++---
 .../hudi/metadata/HoodieTableMetadataWriter.java   |  45 +++++
 .../hudi/metadata/MetadataIndexGenerator.java      | 114 +++++++++++
 .../hudi/metadata/TestMetadataIndexGenerator.java  | 121 ++++++++++++
 .../FlinkHoodieBackedTableMetadataWriter.java      |  20 +-
 .../JavaHoodieBackedTableMetadataWriter.java       |  18 ++
 .../SparkHoodieBackedTableMetadataWriter.java      |  47 ++++-
 ...ieBackedTableMetadataWriterTableVersionSix.java |  23 ++-
 .../hudi/metadata/SparkMetadataWriterFactory.java  |   5 +
 .../java/org/apache/hudi/io/BaseTestHandle.java    |  69 +++++++
 .../io/KeyGeneratorForDataGeneratorRecords.java    |  41 ++++
 .../java/org/apache/hudi/io/TestAppendHandle.java  |  96 +++++++++
 .../java/org/apache/hudi/io/TestCreateHandle.java  |  84 ++++++++
 .../java/org/apache/hudi/io/TestMergeHandle.java   |  84 ++++++++
 .../apache/hudi/io/TestMetadataWriterCommit.java   | 148 ++++++++++++++
 15 files changed, 1103 insertions(+), 28 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 53c37db3ebd..33db4b9e822 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.EngineType;
@@ -92,6 +93,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -146,6 +148,9 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
   // from the metadata payload schema.
   private static final String RECORD_KEY_FIELD_NAME = 
HoodieMetadataPayload.KEY_FIELD_NAME;
 
+  // tracks the list of MDT partitions which can write to metadata table in a 
streaming manner.
+  private static final List<MetadataPartitionType> 
STREAMING_WRITES_SUPPORTED_PARTITIONS = Arrays.asList(RECORD_INDEX);
+
   // Average size of a record saved within the record index.
   // Record index has a fixed size schema. This has been calculated based on 
experiments with default settings
   // for block size (1MB), compression (GZ) and disabling the hudi metadata 
fields.
@@ -165,7 +170,17 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
 
   // Is the MDT bootstrapped and ready to be read from
   boolean initialized = false;
+  boolean streamingWritesEnabled = false;
   private HoodieTableFileSystemView metadataView;
+  private Option<MetadataIndexGenerator> metadataIndexGenerator;
+
+  protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> 
storageConf,
+                                            HoodieWriteConfig writeConfig,
+                                            HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy,
+                                            HoodieEngineContext engineContext,
+                                            Option<String> 
inflightInstantTimestamp) {
+    this(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp, false);
+  }
 
   /**
    * Hudi backed table metadata writer.
@@ -175,12 +190,15 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
    * @param failedWritesCleaningPolicy Cleaning policy on failed writes
    * @param engineContext              Engine context
    * @param inflightInstantTimestamp   Timestamp of any instant in progress
+   * @param streamingWrites            For streaming writes, a long-living 
write client is needed instead of been closed for each write.
+   *                                   This flag will help to decide the 
lifecycle of write client.
    */
   protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> 
storageConf,
                                             HoodieWriteConfig writeConfig,
                                             HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy,
                                             HoodieEngineContext engineContext,
-                                            Option<String> 
inflightInstantTimestamp) {
+                                            Option<String> 
inflightInstantTimestamp,
+                                            boolean streamingWrites) {
     this.dataWriteConfig = writeConfig;
     this.engineContext = engineContext;
     this.storageConf = storageConf;
@@ -199,12 +217,20 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
       }
     }
     ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT 
Reader should have been opened post initialization");
+
+    this.metadataIndexGenerator = streamingWrites ? 
Option.of(initializeMetadataIndexGenerator()) : Option.empty();
+    this.streamingWritesEnabled = streamingWrites;
   }
 
   List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig 
metadataConfig, HoodieTableMetaClient metaClient) {
     return MetadataPartitionType.getEnabledPartitions(metadataConfig, 
metaClient);
   }
 
+  /**
+   * Returns the utilities for metadata index generation.
+   */
+  abstract MetadataIndexGenerator initializeMetadataIndexGenerator();
+
   private void mayBeReinitMetadataReader() {
     if (metadata == null || metadataMetaClient == null || 
metadata.getMetadataFileSystemView() == null) {
       initMetadataReader();
@@ -359,10 +385,9 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataTableInstantTime     data table instant time on which the 
metadata table
-   *                                 is initialized upon
-   * @param partitionsToInit         list of MDT partitions to initialize
-   * @param inflightInstantTimestamp current action instant responsible for 
this initialization
+   * @param dataTableInstantTime     Timestamp to use for the commit
+   * @param partitionsToInit         List of MDT partitions to initialize
+   * @param inflightInstantTimestamp Current action instant responsible for 
this initialization
    */
   private boolean initializeFromFilesystem(String dataTableInstantTime, 
List<MetadataPartitionType> partitionsToInit, Option<String> 
inflightInstantTimestamp) throws IOException {
     Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
@@ -722,12 +747,13 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
 
   /**
    * Fetch record locations from FileSlice snapshot.
-   * @param engineContext context ot use.
-   * @param partitionFileSlicePairs list of pairs of partition and file slice.
+   *
+   * @param engineContext             context ot use.
+   * @param partitionFileSlicePairs   list of pairs of partition and file 
slice.
    * @param recordIndexMaxParallelism parallelism to use.
-   * @param activeModule active module of interest.
-   * @param metaClient metaclient instance to use.
-   * @param dataWriteConfig write config to use.
+   * @param activeModule              active module of interest.
+   * @param metaClient                metaclient instance to use.
+   * @param dataWriteConfig           write config to use.
    * @return
    */
   private static <T> HoodieData<HoodieRecord> 
readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext,
@@ -846,7 +872,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
   /**
    * Function to find hoodie partitions and list files in them in parallel.
    *
-   * @param initializationTime Files which have a timestamp after this are 
neglected
+   * @param initializationTime  Files which have a timestamp after this are 
neglected
    * @param pendingDataInstants Pending instants on data set
    * @return List consisting of {@code DirectoryInfo} for each partition found.
    */
@@ -905,7 +931,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
   /**
    * Function to find hoodie partitions and list files in them in parallel 
from MDT.
    *
-   * @param initializationTime Files which have a timestamp after this are 
neglected
+   * @param initializationTime  Files which have a timestamp after this are 
neglected
    * @param pendingDataInstants Files coming from pending instants are 
neglected
    * @return List consisting of {@code DirectoryInfo} for each partition found.
    */
@@ -1093,6 +1119,124 @@ public abstract class 
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes 
should be enabled for startCommit API");
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      throw new HoodieMetadataException("Starting the same commit in Metadata 
table more than once w/o rolling back : " + instantTime);
+    }
+
+    // this is where we might instantiate the write client to metadata table 
for the first time.
+    getWriteClient().startCommitForMetadataTable(metadataMetaClient, 
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime) {
+    List<MetadataPartitionType> partitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
+    partitionsToTag.remove(FILES);
+    partitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+    if (partitionsToTag.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap(
+        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(partitionsToTag, 
dataWriteConfig));
+
+    // tag records w/ location
+    Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
hoodieFileGroupsToUpdateAndTaggedMdtRecords = 
tagRecordsWithLocationForStreamingWrites(untaggedRecords,
+        
partitionsToTag.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
+
+    // write partial writes to MDT table (for those partitions where streaming 
writes are enabled)
+    HoodieData<WriteStatus> writeStatusCollection = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
 instantTime));
+    // dag not yet de-referenced. do not invoke any action on 
writeStatusCollection yet.
+    return writeStatusCollection;
+  }
+
+  /**
+   * Upsert the tagged records to metadata table in the streaming flow.
+   * @param fileGroupIdToTaggedRecords Pair of {@link List} of {@link 
HoodieFileGroupId} referring to list of all file groups ids that could receive 
updates
+   *                                   and {@link HoodieData} of tagged {@link 
HoodieRecord}s.
+   * @param instantTime                Instant time of interest.
+   */
+  protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> fileGroupIdToTaggedRecords, String instantTime) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  @Override
+  public void completeStreamingCommit(String instantTime, HoodieEngineContext 
context, List<HoodieWriteStat> partialWriteStats, HoodieCommitMetadata 
metadata) {
+    List<HoodieWriteStat> allWriteStats = new ArrayList<>(partialWriteStats);
+    // update metadata for left over partitions which does not have streaming 
writes support.
+    allWriteStats.addAll(prepareAndWriteToNonStreamingPartitions(context, 
metadata, instantTime).map(WriteStatus::getStat).collectAsList());
+    getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(), 
HoodieTimeline.DELTA_COMMIT_ACTION,
+        Collections.emptyMap(), Option.empty());
+  }
+
+  private HoodieData<WriteStatus> 
prepareAndWriteToNonStreamingPartitions(HoodieEngineContext context, 
HoodieCommitMetadata commitMetadata, String instantTime) {
+    Set<String> partitionsToUpdate = 
getNonStreamingMetadataPartitionsToUpdate();
+    Map<String, HoodieData<HoodieRecord>> mdtPartitionsAndUnTaggedRecords = 
new BatchMetadataConversionFunction(instantTime, commitMetadata, 
partitionsToUpdate)
+        .convertMetadata();
+
+    HoodieData<HoodieRecord> untaggedMdtRecords = context.emptyHoodieData();
+    for (Map.Entry<String, HoodieData<HoodieRecord>> entry: 
mdtPartitionsAndUnTaggedRecords.entrySet()) {
+      untaggedMdtRecords = untaggedMdtRecords.union(entry.getValue());
+    }
+
+    // write to mdt table for non-streaming mdt partitions
+    Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> taggedRecords = 
tagRecordsWithLocationForStreamingWrites(untaggedMdtRecords, 
partitionsToUpdate);
+    HoodieData<WriteStatus> writeStatusHoodieData = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedRecords, 
instantTime));
+    return writeStatusHoodieData;
+  }
+
+  private Set<String> getNonStreamingMetadataPartitionsToUpdate() {
+    Set<String> toReturn = 
enabledPartitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+    STREAMING_WRITES_SUPPORTED_PARTITIONS.forEach(metadataPartitionType -> 
toReturn.remove(metadataPartitionType.getPartitionPath()));
+    return toReturn;
+  }
+
+  /**
+   * Returns a pair of updated fileId list in the partition and the tagged MDT 
records.
+   * @param untaggedRecords Untagged MDT records.
+   */
+  protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
tagRecordsWithLocationForStreamingWrites(HoodieData<HoodieRecord> 
untaggedRecords,
+                                                                               
                              Set<String> enabledMetadataPartitions) {
+    List<HoodieFileGroupId> updatedFileGroupIds = new ArrayList<>();
+    // Fetch latest file slices for all enabled MDT partitions
+    Map<String, List<FileSlice>> partitionToLatestFileSlices = new HashMap<>();
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      enabledMetadataPartitions.forEach(partitionName -> {
+        List<FileSlice> fileSlices =
+            
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
+        if (fileSlices.isEmpty()) {
+          // scheduling or initialising of INDEX only initializes the file 
group and not add commit
+          // so if there are no committed file slices, look for inflight slices
+          
ValidationUtils.checkState(dataMetaClient.getTableConfig().getMetadataPartitionsInflight().contains(partitionName),
+              String.format("Partition %s should be part of inflight metadata 
partitions here %s", partitionName, 
dataMetaClient.getTableConfig().getMetadataPartitionsInflight()));
+          fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
+        }
+        partitionToLatestFileSlices.put(partitionName, fileSlices);
+        final int fileGroupCount = fileSlices.size();
+        fileSlices.forEach(fileSlice -> 
updatedFileGroupIds.add(fileSlice.getFileGroupId()));
+        ValidationUtils.checkArgument(fileGroupCount > 0, 
String.format("FileGroup count for MDT partition %s should be > 0", 
partitionName));
+      });
+    }
+
+    HoodieData<HoodieRecord> taggedRecords = untaggedRecords.map(mdtRecord -> {
+      String mdtPartition = mdtRecord.getPartitionPath();
+      List<FileSlice> latestFileSlices = 
partitionToLatestFileSlices.get(mdtPartition);
+      FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(mdtRecord.getRecordKey(),
+          latestFileSlices.size()));
+      mdtRecord.unseal();
+      mdtRecord.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+      mdtRecord.seal();
+      return mdtRecord;
+    });
+
+    return Pair.of(updatedFileGroupIds, taggedRecords);
+  }
+
   /**
    * Update from {@code HoodieCommitMetadata}.
    *
@@ -1102,26 +1246,44 @@ public abstract class 
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
   @Override
   public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
     mayBeReinitMetadataReader();
-    processAndCommit(instantTime, () -> {
+    processAndCommit(instantTime, new 
BatchMetadataConversionFunction(instantTime, commitMetadata, 
getMetadataPartitionsToUpdate()));
+    closeInternal();
+  }
+
+  /**
+   * Hoodie commit metadata conversion function for non-streaming writes.
+   */
+  private class BatchMetadataConversionFunction implements 
ConvertMetadataFunction {
+
+    private final HoodieCommitMetadata commitMetadata;
+    private final String instantTime;
+    private final Set<String> partitionsToUpdate;
+    public BatchMetadataConversionFunction(String instantTime, 
HoodieCommitMetadata commitMetadata, Set<String> partitionsToUpdate) {
+      this.instantTime = instantTime;
+      this.commitMetadata = commitMetadata;
+      this.partitionsToUpdate = partitionsToUpdate;
+    }
+
+    @Override
+    public Map<String, HoodieData<HoodieRecord>> convertMetadata() {
       Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
               engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient, getTableMetadata(),
               dataWriteConfig.getMetadataConfig(),
-              getMetadataPartitionsToUpdate(), 
dataWriteConfig.getBloomFilterType(),
+              partitionsToUpdate, dataWriteConfig.getBloomFilterType(),
               dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
               Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
-      if 
(getMetadataPartitionsToUpdate().contains(RECORD_INDEX.getPartitionPath())) {
+      if (partitionsToUpdate.contains(RECORD_INDEX.getPartitionPath())) {
         HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()),
 commitMetadata);
         partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), 
partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
       }
       updateExpressionIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
       updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, 
instantTime);
       return partitionToRecordMap;
-    });
-    closeInternal();
+    }
   }
 
   /**
@@ -1369,10 +1531,12 @@ public abstract class 
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
    */
   protected abstract I 
convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records);
 
+  protected abstract HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(O records);
+
   protected void commitInternal(String instantTime, Map<String, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
                                 Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
     ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
-    Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> result = 
prepRecords(partitionRecordsMap);
+    Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> result = 
tagRecordsWithLocation(partitionRecordsMap, isInitializing);
     HoodieData<HoodieRecord> preppedRecords = result.getKey();
     I preppedRecordInputs = 
convertHoodieDataToEngineSpecificData(preppedRecords);
 
@@ -1424,6 +1588,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
 
   protected abstract void upsertAndCommit(BaseHoodieWriteClient<?, I, ?, O> 
writeClient, String instantTime, I preppedRecordInputs);
 
+  protected abstract void upsertAndCommit(BaseHoodieWriteClient<?, I, ?, O> 
writeClient, String instantTime, I preppedRecordInputs, List<HoodieFileGroupId> 
fileGroupsIdsToUpdate);
+
   /**
    * Rolls back any failed writes if cleanup policy is EAGER. If any writes 
were cleaned up, the meta client is reloaded.
    * @param dataWriteConfig write config for the data table
@@ -1468,7 +1634,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
    * @return Pair of {@link HoodieData} of {@link HoodieRecord} referring to 
the prepared records to be ingested to metadata table and
    * List of {@link HoodieFileGroupId} containing all file groups from the 
partitions being written in metadata table.
    */
-  protected Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> 
prepRecords(Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) {
+  protected Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> 
tagRecordsWithLocation(Map<String, HoodieData<HoodieRecord>> 
partitionRecordsMap, boolean isInitializing) {
     // The result set
     HoodieData<HoodieRecord> allPartitionRecords = 
engineContext.emptyHoodieData();
     try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
@@ -1479,8 +1645,10 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
         List<FileSlice> fileSlices =
             
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
         if (fileSlices.isEmpty()) {
-          // scheduling of INDEX only initializes the file group and not add 
commit
+          // scheduling or initialising of INDEX only initializes the file 
group and not add commit
           // so if there are no committed file slices, look for inflight slices
+          ValidationUtils.checkState(isInitializing || 
dataMetaClient.getTableConfig().getMetadataPartitionsInflight().contains(partitionName),
+              String.format("Partition %s should be part of inflight metadata 
partitions here %s", partitionName, 
dataMetaClient.getTableConfig().getMetadataPartitionsInflight()));
           fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
         }
         final int fileGroupCount = fileSlices.size();
@@ -1547,11 +1715,11 @@ public abstract class 
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
       String metadataTableName = writeClient.getConfig().getTableName();
       boolean tableNameExists = StringUtils.nonEmpty(metadataTableName);
       String executionDurationMetricName = tableNameExists
-              ? String.format("%s.%s", metadataTableName, 
HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_DURATION)
-              : HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_DURATION;
+          ? String.format("%s.%s", metadataTableName, 
HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_DURATION)
+          : HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_DURATION;
       String executionStatusMetricName = tableNameExists
-              ? String.format("%s.%s", metadataTableName, 
HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_STATUS)
-              : HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_STATUS;
+          ? String.format("%s.%s", metadataTableName, 
HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_STATUS)
+          : HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_STATUS;
       long timeSpent = metadataTableServicesTimer.endTimer();
       metrics.ifPresent(m -> m.setMetric(executionDurationMetricName, 
timeSpent));
       if (allTableServicesExecutedSuccessfullyOrSkipped) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index 116d76bbc0b..8ffdb0a2720 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -22,8 +22,11 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.VisibleForTesting;
 
@@ -36,6 +39,48 @@ import java.util.List;
  */
 public interface HoodieTableMetadataWriter<I,O> extends Serializable, 
AutoCloseable {
 
+  /**
+   * Starts a new commit in metadata table for streaming write flow.
+   *
+   * @param instantTime The instant time of interest.
+   */
+  void startCommit(String instantTime);
+
+  /**
+   * Prepare records and write to MDT table for all eligible partitions except 
FILES partition.
+   *
+   * <p>This will be used in streaming writes, where in data table 
write-statuses are maintained as HoodieData,
+   * prepares records and write to MDT table partitions (except FILES).
+   *
+   * <p>Caution: that no actions should be triggered on the incoming 
HoodieData&lt;WriteStatus&gt;
+   * and the writes to metadata table. Caller is expected to trigger #collect 
just once for both set of HoodieData&lt;WriteStatus&gt;.
+   *
+   * @param writeStatus {@link HoodieData} of {@link WriteStatus} from data 
table writes.
+   * @param instantTime  instant time of interest.
+   *
+   * @return {@link HoodieData} of {@link WriteStatus} for writes to metadata 
table.
+   */
+  HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime);
+
+  /**
+   * Completes the multiple commits in streaming writes.
+   *
+   * <p>The streaming writes work flow:
+   *
+   * <ol>
+   *   <li>writes the inputs in data table o all data files;</li>
+   *   <li>update metadata table partitions in {@link 
#streamWriteToMetadataPartitions};</li>
+   *   <li>finalizes the writes for data table;</li>
+   *   <li>caller invokes this method to update metadata table for other 
non-streaming enabled partitions(included FILES), and finally completes the 
commit in metadata table.</li>
+   * </ol>
+   *
+   * @param instantTime       Instant time of interest.
+   * @param context           The engine context {@link HoodieEngineContext}.
+   * @param partialWriteStats List<HoodieWriteStat> for partial/streaming 
writes to metadata table completed so far.
+   * @param commitMetadata    The data table {@link HoodieCommitMetadata}.
+   */
+  void completeStreamingCommit(String instantTime, HoodieEngineContext 
context, List<HoodieWriteStat> partialWriteStats, HoodieCommitMetadata 
commitMetadata);
+
   /**
    * Builds the given metadata partitions to create index.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
new file mode 100644
index 00000000000..f8809d31291
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * For now this is a placeholder to generate all MDT records in one place.
+ * Once <a href="https://github.com/apache/hudi/pull/13226";>Refactor MDT 
update logic with Indexer</a> is landed,
+ * we will leverage the new abstraction to generate MDT records.
+ */
+public class MetadataIndexGenerator implements Serializable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetadataIndexGenerator.class);
+
+  /**
+   * MDT record transformation utility. This function is expected to be 
invoked from a map Partition call, where one spark task will receive
+   * one WriteStatus as input and the output contains prepared MDT table 
records for all eligible partitions that can operate on one
+   * WriteStatus instance only.
+   */
+  static class WriteStatusBasedMetadataIndexMapper implements 
SerializableFunction<WriteStatus, Iterator<HoodieRecord>> {
+    private final List<MetadataPartitionType> enabledPartitionTypes;
+    private final HoodieWriteConfig dataWriteConfig;
+
+    public WriteStatusBasedMetadataIndexMapper(List<MetadataPartitionType> 
enabledPartitionTypes, HoodieWriteConfig dataWriteConfig) {
+      this.enabledPartitionTypes = enabledPartitionTypes;
+      this.dataWriteConfig = dataWriteConfig;
+    }
+
+    @Override
+    public Iterator<HoodieRecord> apply(WriteStatus writeStatus) throws 
Exception {
+      List<HoodieRecord> allRecords = new ArrayList<>();
+      if (enabledPartitionTypes.contains(RECORD_INDEX)) {
+        allRecords.addAll(processWriteStatusForRLI(writeStatus, 
dataWriteConfig));
+      }
+      // yet to add support for more partitions.
+      // bloom filter
+      // secondary index
+      // expression index.
+      return allRecords.iterator();
+    }
+  }
+
+  protected static List<HoodieRecord> processWriteStatusForRLI(WriteStatus 
writeStatus, HoodieWriteConfig dataWriteConfig) {
+    List<HoodieRecord> allRecords = new ArrayList<>();
+    for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
+      if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+        if (recordDelegate.getIgnoreIndexUpdate()) {
+          continue;
+        }
+        HoodieRecord hoodieRecord;
+        Option<HoodieRecordLocation> newLocation = 
recordDelegate.getNewLocation();
+        if (newLocation.isPresent()) {
+          if (recordDelegate.getCurrentLocation().isPresent()) {
+            // This is an update, no need to update index if the location has 
not changed
+            // newLocation should have the same fileID as currentLocation. The 
instantTimes differ as newLocation's
+            // instantTime refers to the current commit which was completed.
+            if 
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
 {
+              final String msg = String.format("Detected update in location of 
record with key %s from %s to %s. The fileID should not change.",
+                  recordDelegate, recordDelegate.getCurrentLocation().get(), 
newLocation.get());
+              LOG.error(msg);
+              throw new HoodieMetadataException(msg);
+            }
+            // for updates, we can skip updating RLI partition in MDT
+          } else {
+            // Insert new record case
+            hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
+                recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(),
+                newLocation.get().getFileId(), 
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
+            allRecords.add(hoodieRecord);
+          }
+        } else {
+          // Delete existing index for a deleted record
+          hoodieRecord = 
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
+          allRecords.add(hoodieRecord);
+        }
+      }
+    }
+    return allRecords;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
new file mode 100644
index 00000000000..188e93099ff
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestMetadataIndexGenerator extends HoodieCommonTestHarness {
+
+  @Test
+  public void testRLIIndexMapperWithInsertsAndUpserts() throws Exception {
+    // Generate write status with inserts and upserts
+    WriteStatus writeStatus = new WriteStatus(true, 0.0d);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    List<HoodieRecord> records = dataGenerator.generateInserts("001", 10);
+    for (int i = 0; i < records.size(); i++) {
+      HoodieRecord record = records.get(i);
+      String fileId = UUID.randomUUID().toString();
+      if (i < 5) {
+        // 5 updates
+        record.setCurrentLocation(new 
HoodieRecordLocation(InProcessTimeGenerator.createNewInstantTime(), fileId));
+      }
+      // 5 inserts
+      record.setNewLocation(new 
HoodieRecordLocation(InProcessTimeGenerator.createNewInstantTime(), fileId));
+      writeStatus.markSuccess(record, Option.empty());
+    }
+
+    // Generate RLI records
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("random")
+        .build();
+    MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper 
writeStatusBasedMetadataIndexMapper = new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(
+        Collections.singletonList(MetadataPartitionType.RECORD_INDEX), 
writeConfig);
+    Iterator<HoodieRecord> rliRecords = 
writeStatusBasedMetadataIndexMapper.apply(writeStatus);
+    AtomicInteger totalRLIRecords = new AtomicInteger();
+    rliRecords.forEachRemaining(rliRecord -> {
+      totalRLIRecords.getAndIncrement();
+
+      // verify RLI metadata
+      HoodieRecord<HoodieMetadataPayload> record = rliRecord;
+      assertEquals(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), 
record.getKey().getPartitionPath());
+      HoodieRecordIndexInfo recordIndexInfo = 
record.getData().recordIndexMetadata;
+      assertNotNull(recordIndexInfo);
+      assertTrue(StringUtils.nonEmpty(recordIndexInfo.getPartitionName()));
+      assertNotNull(recordIndexInfo.getFileIdHighBits());
+      assertNotNull(recordIndexInfo.getFileIdLowBits());
+    });
+    // RLI is only updated for inserts
+    assertEquals(5, totalRLIRecords.get());
+  }
+
+  @Test
+  public void testRLIIndexGeneratorWithDeletes() throws Exception {
+    // Generate write status with deletes
+    WriteStatus writeStatus = new WriteStatus(true, 0.0d);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    List<HoodieRecord> records = dataGenerator.generateInserts("001", 10);
+    for (int i = 0; i < records.size(); i++) {
+      HoodieRecord record = records.get(i);
+      String fileId = UUID.randomUUID().toString();
+      record.setCurrentLocation(new 
HoodieRecordLocation(InProcessTimeGenerator.createNewInstantTime(), fileId));
+      writeStatus.markSuccess(record, Option.empty());
+    }
+
+    // Generate RLI records
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("random")
+        .build();
+    MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper 
writeStatusBasedMetadataIndexMapper = new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(
+        Collections.singletonList(MetadataPartitionType.RECORD_INDEX), 
writeConfig);
+    Iterator<HoodieRecord> rliRecords = 
writeStatusBasedMetadataIndexMapper.apply(writeStatus);
+    AtomicInteger totalRLIRecords = new AtomicInteger();
+    rliRecords.forEachRemaining(rliRecord -> {
+      totalRLIRecords.getAndIncrement();
+
+      // verify RLI record payload is EmptyHoodieRecordPayload
+      HoodieRecord<EmptyHoodieRecordPayload> record = rliRecord;
+      assertEquals(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), 
record.getKey().getPartitionPath());
+      assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
+    });
+    // RLI is only updated for inserts
+    assertEquals(10, totalRLIRecords.get());
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 9ebf0753964..e1911da1f67 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -22,9 +22,11 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -110,6 +112,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     return records.collectAsList();
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(List<WriteStatus> records) {
+    return HoodieListData.lazy(records);
+  }
+
   @Override
   protected void bulkCommit(String instantTime, String partitionName, 
HoodieData<HoodieRecord> records, int fileGroupCount) {
     // TODO: functional and secondary index are not supported with Flink yet, 
but we should fix the partition name when we support them.
@@ -120,7 +127,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   protected void commitInternal(String instantTime, Map<String, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
                                 Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
     ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
-    HoodieData<HoodieRecord> preppedRecords = 
prepRecords(partitionRecordsMap).getKey();
+    HoodieData<HoodieRecord> preppedRecords = 
tagRecordsWithLocation(partitionRecordsMap, isInitializing).getKey();
     List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
 
     //  Flink engine does not optimize initialCommit to MDT as bulk insert is 
not yet supported
@@ -209,6 +216,17 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     writeClient.commit(instantTime, writeStatusJavaRDD);
   }
 
+  @Override
+  protected void upsertAndCommit(BaseHoodieWriteClient<?, List<HoodieRecord>, 
?, List<WriteStatus>> writeClient, String instantTime, List<HoodieRecord> 
preppedRecordInputs,
+                                 List<HoodieFileGroupId> 
fileGroupsIdsToUpdate) {
+    throw new UnsupportedOperationException("Not implemented for Flink engine 
yet");
+  }
+
+  @Override
+  MetadataIndexGenerator initializeMetadataIndexGenerator() {
+    throw new UnsupportedOperationException("Streaming writes are not 
supported for Flink");
+  }
+
   @Override
   protected EngineType getEngineType() {
     return EngineType.FLINK;
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
index c820f4539d5..149ddf6407f 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
@@ -22,9 +22,11 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.HoodieJavaWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -61,6 +63,11 @@ public class JavaHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetada
     super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp);
   }
 
+  @Override
+  MetadataIndexGenerator initializeMetadataIndexGenerator() {
+    throw new UnsupportedOperationException("Streaming writes are not 
supported for Java");
+  }
+
   public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
                                                  HoodieWriteConfig writeConfig,
                                                  HoodieEngineContext context,
@@ -102,6 +109,11 @@ public class JavaHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetada
     return records.collectAsList();
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(List<WriteStatus> records) {
+    return HoodieListData.lazy(records);
+  }
+
   @Override
   protected void bulkCommit(String instantTime, String partitionName, 
HoodieData<HoodieRecord> records, int fileGroupCount) {
     commitInternal(instantTime, Collections.singletonMap(partitionName, 
records), true, Option.of(new JavaHoodieMetadataBulkInsertPartitioner()));
@@ -120,6 +132,12 @@ public class JavaHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetada
     writeClient.commit(instantTime, writeStatusJavaRDD);
   }
 
+  @Override
+  protected void upsertAndCommit(BaseHoodieWriteClient<?, List<HoodieRecord>, 
?, List<WriteStatus>> writeClient, String instantTime, List<HoodieRecord> 
preppedRecordInputs,
+                                 List<HoodieFileGroupId> 
fileGroupsIdsToUpdate) {
+    throw new UnsupportedOperationException("Not implemented for Java engine 
yet");
+  }
+
   @Override
   protected BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> 
initializeWriteClient() {
     return new HoodieJavaWriteClient(engineContext, metadataWriteConfig);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 6a4088f0879..a98d7ae47d6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -116,7 +117,16 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
                                        HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy,
                                        HoodieEngineContext engineContext,
                                        Option<String> 
inflightInstantTimestamp) {
-    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp);
+    this(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp, false);
+  }
+
+  SparkHoodieBackedTableMetadataWriter(StorageConfiguration<?> hadoopConf,
+                                       HoodieWriteConfig writeConfig,
+                                       HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy,
+                                       HoodieEngineContext engineContext,
+                                       Option<String> inflightInstantTimestamp,
+                                       boolean streamingWrites) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp, streamingWrites);
   }
 
   @Override
@@ -147,7 +157,23 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   }
 
   @Override
-  protected void bulkInsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String 
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+    return HoodieJavaRDD.of(records);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> 
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> fileGroupIdToTaggedRecords, String instantTime) {
+    JavaRDD<HoodieRecord> mdtRecords = 
HoodieJavaRDD.getJavaRDD(fileGroupIdToTaggedRecords.getValue());
+    engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Upserting with instant %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
+    JavaRDD<WriteStatus> metadataWriteStatusesSoFar = 
getSparkWriteClient(Option.empty()).upsertPreppedRecords(mdtRecords, 
instantTime, Option.of(
+        fileGroupIdToTaggedRecords.getKey()));
+    return metadataWriteStatusesSoFar;
+  }
+
+  @Override
+  protected void bulkInsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient,
+                                     String instantTime,
+                                     JavaRDD<HoodieRecord> preppedRecordInputs,
                                      Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
     JavaRDD<WriteStatus> writeStatusJavaRDD = 
writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, 
bulkInsertPartitioner);
     writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
@@ -159,6 +185,15 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
   }
 
+  @Override
+  protected void upsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient,
+                                 String instantTime,
+                                 JavaRDD<HoodieRecord> preppedRecordInputs,
+                                 List<HoodieFileGroupId> 
fileGroupsIdsToUpdate) {
+    JavaRDD<WriteStatus> writeStatusJavaRDD = 
getSparkWriteClient(Option.of(writeClient)).upsertPreppedRecords(preppedRecordInputs,
 instantTime, Option.of(fileGroupsIdsToUpdate));
+    writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
+  }
+
   @Override
   protected void bulkCommit(
       String instantTime, String partitionName, HoodieData<HoodieRecord> 
records,
@@ -241,6 +276,14 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     return exprIndexRecords;
   }
 
+  protected MetadataIndexGenerator initializeMetadataIndexGenerator() {
+    return new MetadataIndexGenerator();
+  }
+
+  protected SparkRDDMetadataWriteClient 
getSparkWriteClient(Option<BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, 
JavaRDD<WriteStatus>>> writeClientOpt) {
+    return ((SparkRDDMetadataWriteClient) 
writeClientOpt.orElse(getWriteClient()));
+  }
+
   @Override
   public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, 
JavaRDD<WriteStatus>> initializeWriteClient() {
     return new SparkRDDMetadataWriteClient(engineContext, metadataWriteConfig, 
Option.empty());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 802a2a66a85..1aa2a88a038 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDMetadataWriteClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -57,6 +59,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 
 public class SparkHoodieBackedTableMetadataWriterTableVersionSix extends 
HoodieBackedTableMetadataWriterTableVersionSix<JavaRDD<HoodieRecord>, 
JavaRDD<WriteStatus>> {
 
@@ -92,6 +95,11 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
     super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp);
   }
 
+  @Override
+  MetadataIndexGenerator initializeMetadataIndexGenerator() {
+    throw new UnsupportedOperationException("Streaming writes are not 
supported for Spark table version six");
+  }
+
   @Override
   protected void initRegistry() {
     if (metadataWriteConfig.isMetricsOn()) {
@@ -119,6 +127,11 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
     return HoodieJavaRDD.getJavaRDD(records);
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+    throw new HoodieNotSupportedException("Unsupported flow for table version 
6");
+  }
+
   @Override
   protected void bulkInsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String 
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
                                      Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
@@ -132,6 +145,13 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
     writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
   }
 
+  @Override
+  protected void upsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String 
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
+                                 List<HoodieFileGroupId> 
fileGroupsIdsToUpdate) {
+    JavaRDD<WriteStatus> writeStatusJavaRDD = 
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
+    writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
+  }
+
   @Override
   protected void bulkCommit(
       String instantTime, String partitionName, HoodieData<HoodieRecord> 
records,
@@ -148,7 +168,8 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
     SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
     String actionType = 
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, 
HoodieTableType.MERGE_ON_READ);
     writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime, 
actionType);
-    writeClient.deletePartitions(partitionsToDrop, instantTime);
+    HoodieWriteResult result = writeClient.deletePartitions(partitionsToDrop, 
instantTime);
+    writeClient.commit(instantTime, result.getWriteStatuses(), Option.empty(), 
REPLACE_COMMIT_ACTION, result.getPartitionToReplaceFileIds());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
index 3c6c9c4c1c3..3e5507213cb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
@@ -49,6 +49,11 @@ public class SparkMetadataWriterFactory {
     }
   }
 
+  public static HoodieTableMetadataWriter 
createWithStreamingWrites(StorageConfiguration<?> conf, HoodieWriteConfig 
writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                                    
HoodieEngineContext context, Option<String> inflightInstantTimestamp) {
+    return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, 
failedWritesCleaningPolicy, context, inflightInstantTimestamp, true);
+  }
+
   public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext context, HoodieTableConfig 
tableConfig) {
     return create(conf, writeConfig, context, Option.empty(), tableConfig);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/BaseTestHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/BaseTestHandle.java
new file mode 100644
index 00000000000..910c5549e7a
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/BaseTestHandle.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+public class BaseTestHandle extends HoodieSparkClientTestHarness {
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestHoodieRowCreateHandle");
+    initPath();
+    initHoodieStorage();
+    initTestDataGenerator();
+    initMetaClient();
+    initTimelineService();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  Pair<WriteStatus, List<HoodieRecord>> createParquetFile(HoodieWriteConfig 
config, HoodieTable table, String partitionPath,
+                                                          String fileId, 
String instantTime, HoodieTestDataGenerator dataGenerator) {
+    List<HoodieRecord> records = dataGenerator.generateInserts(instantTime, 
100);
+    Map<String, HoodieRecord> recordMap = new HashMap<>();
+    for (int i = 0; i < records.size(); i++) {
+      recordMap.put(String.valueOf(i), records.get(i));
+    }
+    HoodieCreateHandle handle = new HoodieCreateHandle(config, instantTime, 
table, partitionPath, fileId, recordMap, new LocalTaskContextSupplier());
+    handle.write();
+    handle.close();
+    return Pair.of(handle.writeStatus, records);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/KeyGeneratorForDataGeneratorRecords.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/KeyGeneratorForDataGeneratorRecords.java
new file mode 100644
index 00000000000..b93e32a87a0
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/KeyGeneratorForDataGeneratorRecords.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+
+import org.apache.avro.generic.GenericRecord;
+
+public class KeyGeneratorForDataGeneratorRecords extends BuiltinKeyGenerator {
+
+  public KeyGeneratorForDataGeneratorRecords(TypedProperties config) {
+    super(config);
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return record.get("_row_key").toString();
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return record.get("partition_path").toString();
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestAppendHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestAppendHandle.java
new file mode 100644
index 00000000000..770627b35eb
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestAppendHandle.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieAppendHandle}.
+ */
+public class TestAppendHandle extends BaseTestHandle {
+
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testAppendHandleRLIStats(boolean populateMetaFields) {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withPopulateMetaFields(populateMetaFields)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    // create parquet file
+    createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    // generate update records
+    instantTime = "001";
+    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, 50);
+    HoodieAppendHandle handle = new HoodieAppendHandle(config, instantTime, 
table, partitionPath, fileId, records.iterator(), new 
LocalTaskContextSupplier());
+    Map<String, HoodieRecord> recordMap = new HashMap<>();
+    for (int i = 0; i < records.size(); i++) {
+      recordMap.put(String.valueOf(i), records.get(i));
+    }
+    // write the update records
+    handle.write(recordMap);
+    WriteStatus writeStatus = handle.writeStatus;
+    handle.close();
+
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+    // validate write status has all record delegates
+    if (populateMetaFields) {
+      assertEquals(records.size(), 
writeStatus.getWrittenRecordDelegates().size());
+      for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
+        assertTrue(recordDelegate.getNewLocation().isPresent());
+        assertEquals(fileId, 
recordDelegate.getNewLocation().get().getFileId());
+        assertEquals(instantTime, 
recordDelegate.getNewLocation().get().getInstantTime());
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestCreateHandle.java
new file mode 100644
index 00000000000..672bb342879
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestCreateHandle.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+public class TestCreateHandle extends BaseTestHandle {
+
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testCreateHandleRLIStats(boolean populateMetaFields) {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withPopulateMetaFields(populateMetaFields)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+    // validate write status has all record delegates
+    if (populateMetaFields) {
+      assertEquals(records.size(), 
writeStatus.getWrittenRecordDelegates().size());
+      for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
+        assertTrue(recordDelegate.getNewLocation().isPresent());
+        assertEquals(fileId, 
recordDelegate.getNewLocation().get().getFileId());
+        assertEquals(instantTime, 
recordDelegate.getNewLocation().get().getInstantTime());
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMergeHandle.java
new file mode 100644
index 00000000000..c7035727ce5
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMergeHandle.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit tests {@link HoodieMergeHandle}.
+ */
+public class TestMergeHandle extends BaseTestHandle {
+
+  @Test
+  public void testMergeHandleRLIStats() throws IOException {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        .withPopulateMetaFields(false)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        
.withKeyGenerator(KeyGeneratorForDataGeneratorRecords.class.getCanonicalName())
+        .build();
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkTable.create(config, new HoodieLocalEngineContext(storageConf), 
metaClient);
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    // Create a parquet file
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+
+    instantTime = "001";
+    List<HoodieRecord> updates = 
dataGenerator.generateUniqueUpdates(instantTime, 10);
+    HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, instantTime, 
table, updates.iterator(), partitionPath, fileId, 
table.getTaskContextSupplier(),
+        new HoodieBaseFile(writeStatus.getStat().getPath()), Option.of(new 
KeyGeneratorForDataGeneratorRecords(config.getProps())));
+    HoodieMergeHelper.newInstance().runMerge(table, mergeHandle);
+    writeStatus = mergeHandle.writeStatus;
+    // verify stats after merge
+    assertEquals(records.size(), writeStatus.getStat().getNumWrites());
+    assertEquals(10, writeStatus.getStat().getNumUpdateWrites());
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
new file mode 100644
index 00000000000..0cbbdb23ea6
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Unit tests Metadata writer APIs with streaming.
+ */
+public class TestMetadataWriterCommit extends BaseTestHandle {
+
+  @Test
+  public void testCreateHandleRLIStats() throws IOException {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withMetadataIndexColumnStats(false)
+            .withSecondaryIndexEnabled(false)
+            .withStreamingWriteEnabled(true)
+            .build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = InProcessTimeGenerator.createNewInstantTime();
+
+    // create a parquet file and obtain corresponding write status
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+    HoodieCommitMetadata commitMetadata = 
createCommitMetadata(writeStatus.getStat(), partitionPath);
+
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+
+    // create mdt writer
+    HoodieBackedTableMetadataWriter mdtWriter = 
(HoodieBackedTableMetadataWriter) 
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
+        HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
+    HoodieTableMetaClient mdtMetaClient = 
HoodieTableMetaClient.builder().setBasePath(metaClient.getMetaPath() + 
"/metadata").setConf(storageConf).build();
+    assertEquals(2, 
mdtMetaClient.getActiveTimeline().filterCompletedInstants().countInstants());
+
+    // Create commit in MDT
+    mdtWriter = (HoodieBackedTableMetadataWriter) 
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
+        HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
+    mdtWriter.startCommit(instantTime);
+    HoodieData<WriteStatus> mdtWriteStatus = 
mdtWriter.streamWriteToMetadataPartitions(HoodieJavaRDD.of(Collections.singletonList(writeStatus),
 context, 1), instantTime);
+    List<HoodieWriteStat> mdtWriteStats = 
mdtWriteStatus.collectAsList().stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    mdtWriter.completeStreamingCommit(instantTime, context, mdtWriteStats, 
commitMetadata);
+    // 3 bootstrap commits for 2 enabled partitions, 1 commit due to update
+    assertEquals(3, 
mdtMetaClient.reloadActiveTimeline().filterCompletedInstants().countInstants());
+
+    // verify commit metadata
+    HoodieCommitMetadata mdtCommitMetadata = 
mdtMetaClient.getActiveTimeline().readCommitMetadata(mdtMetaClient.getActiveTimeline().lastInstant().get());
+    // 2 partitions should be seen in the commit metadata - FILES and Record 
index
+    assertEquals(2, mdtCommitMetadata.getPartitionToWriteStats().size());
+    assertEquals(1, 
mdtCommitMetadata.getPartitionToWriteStats().get(FILES.getPartitionPath()).size());
+    assertEquals(10, 
mdtCommitMetadata.getPartitionToWriteStats().get(RECORD_INDEX.getPartitionPath()).size());
+    
assertFalse(mdtCommitMetadata.getPartitionToWriteStats().containsKey(COLUMN_STATS.getPartitionPath()));
+
+    // Create commit in MDT with col stats enabled
+    
config.getMetadataConfig().setValue(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS,
 "true");
+    instantTime = InProcessTimeGenerator.createNewInstantTime();
+    mdtWriter = (HoodieBackedTableMetadataWriter) 
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
+        HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
+    mdtWriter.startCommit(instantTime);
+    mdtWriteStatus = 
mdtWriter.streamWriteToMetadataPartitions(HoodieJavaRDD.of(Collections.singletonList(writeStatus),
 context, 1), instantTime);
+    mdtWriteStats = 
mdtWriteStatus.collectAsList().stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    mdtWriter.completeStreamingCommit(instantTime, context, mdtWriteStats, 
commitMetadata);
+    // 3 bootstrap commits for 3 enabled partitions, 2 commits due to update
+    assertEquals(5, 
mdtMetaClient.reloadActiveTimeline().filterCompletedInstants().countInstants());
+
+    // Verify commit metadata
+    mdtCommitMetadata = 
mdtMetaClient.getActiveTimeline().readCommitMetadata(mdtMetaClient.getActiveTimeline().lastInstant().get());
+    // 3 partitions should be seen in the commit metadata - FILES, Record 
index and Column stats
+    assertEquals(3, mdtCommitMetadata.getPartitionToWriteStats().size());
+    assertEquals(1, 
mdtCommitMetadata.getPartitionToWriteStats().get(FILES.getPartitionPath()).size());
+    assertEquals(10, 
mdtCommitMetadata.getPartitionToWriteStats().get(RECORD_INDEX.getPartitionPath()).size());
+    assertEquals(2, 
mdtCommitMetadata.getPartitionToWriteStats().get(COLUMN_STATS.getPartitionPath()).size());
+  }
+
+  public static HoodieCommitMetadata createCommitMetadata(HoodieWriteStat 
writeStat, String partitionPath) {
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata("test", "test");
+    commitMetadata.addWriteStat(partitionPath, writeStat);
+    commitMetadata.setOperationType(WriteOperationType.INSERT);
+    return commitMetadata;
+  }
+}

Reply via email to