nsivabalan commented on code in PR #13295:
URL: https://github.com/apache/hudi/pull/13295#discussion_r2112791251


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));

Review Comment:
   I mean, when hudi tries to create another inflight file for an existing 
commit, it will fail by default. 
   "_.hoodie.allow.multi.write.on.same.instant" dictates this. 
   and the default value is false for this config. 
   
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Upsert commit action executor for Metadata table.
+ *
+ * @param <T>
+ */
+public class SparkMetadataTableUpsertCommitActionExecutor<T> extends 
SparkUpsertPreppedDeltaCommitActionExecutor<T> {
+
+  private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new 
WorkloadStat();
+  private final List<HoodieFileGroupId> mdtFileGroupIdList;
+  private final boolean initialCall;
+
+  public SparkMetadataTableUpsertCommitActionExecutor(HoodieSparkEngineContext 
context, HoodieWriteConfig config, HoodieTable table, String instantTime,
+                                                      
HoodieData<HoodieRecord<T>> preppedRecords, List<HoodieFileGroupId> 
mdtFileGroupIdList,
+                                                      boolean initialCall) {
+    super(context, config, table, instantTime, preppedRecords);
+    this.mdtFileGroupIdList = mdtFileGroupIdList;
+    this.initialCall = initialCall;
+  }
+
+  @Override
+  protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> 
inputRDD) {
+    return inputRDD.getStorageLevel() == StorageLevel.NONE();
+  }
+
+  @Override
+  protected WorkloadProfile prepareWorkloadProfile(HoodieData<HoodieRecord<T>> 
inputRecordsWithClusteringUpdate) {
+    // create workload profile only when we are writing to FILES partition in 
Metadata table.
+    WorkloadProfile workloadProfile = new 
WorkloadProfile(Pair.of(Collections.emptyMap(), PLACEHOLDER_GLOBAL_STAT));
+    return workloadProfile;
+  }
+
+  protected void saveWorkloadProfileMetadataToInflight(WorkloadProfile 
profile, String instantTime)
+      throws HoodieCommitException {
+    // with streaming writes support, we might write to metadata table 
multiple times for the same instant times.
+    // ie. writeClient.startCommit(t1), writeClient.upsert(batch1, t1), 
writeClient.upsert(batch2, t1), writeClient.commit(t1, ...)
+    // So, here we are generating inflight file only in the last known writes, 
which we know will only have FILES partition.
+    if (!initialCall) {

Review Comment:
   does not really matter. my initial intention was to create the inflight just 
when adding to FILES partition.
   but logically speaking, we should add inflight when we are writing first 
batch of records to mdt. So, I can flip this. 
   



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {

Review Comment:
   sure. 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        filesPartitionFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+
+      fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        nonFilesPartitionsFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+    }
+  }
+
+  private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig,
+                                                       Map<String, 
HoodieRecord> filesPartitionExpectedRecordsMap,
+                                                       Map<String, 
HoodieRecord> rliPartitionExpectedRecordsMap,
+                                                       String 
validMetadataInstant) throws IOException {
+    // read from MDT and validate all records.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      FileSlice filesFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.FILES.getPartitionPath()).get(0);

Review Comment:
   ack



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        filesPartitionFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+
+      fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {

Review Comment:
   sure.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,

Review Comment:
   I had to return 2 to 3 lists or map and took this route. 
   



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        filesPartitionFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+
+      fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        nonFilesPartitionsFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+    }
+  }
+
+  private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig,
+                                                       Map<String, 
HoodieRecord> filesPartitionExpectedRecordsMap,
+                                                       Map<String, 
HoodieRecord> rliPartitionExpectedRecordsMap,
+                                                       String 
validMetadataInstant) throws IOException {
+    // read from MDT and validate all records.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      FileSlice filesFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.FILES.getPartitionPath()).get(0);
+      readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, 
filesFileSlice, filesPartitionExpectedRecordsMap, validMetadataInstant);
+
+      FileSlice rliFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath()).get(0);
+      readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, 
rliFileSlice, rliPartitionExpectedRecordsMap, validMetadataInstant);
+    }
+  }
+
+  private void readFromMDTFileSliceAndValidate(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, FileSlice fileSlice, 
Map<String, HoodieRecord> expectedRecordsMap,
+                                               String validMetadataInstant)
+      throws IOException {
+    // open readers
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
openReaders(MetadataPartitionType.FILES.getPartitionPath(), fileSlice, 
metaClient, metadataMetaClient, hoodieWriteConfig,
+        validMetadataInstant);
+    try {
+      // read from the file slice of interest.
+      List<String> sortedKeysForFilesPartition = new 
ArrayList<>(expectedRecordsMap.keySet());
+      Collections.sort(sortedKeysForFilesPartition);
+
+      Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = 
readLogRecords(readers.getRight(), sortedKeysForFilesPartition);
+
+      Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap =
+          readFromBaseAndMergeWithLogRecords(readers.getKey(), 
sortedKeysForFilesPartition, logRecords, fileSlice.getPartitionPath());
+      assertEquals(actualMdtRecordMap.size(), expectedRecordsMap.size());
+      actualMdtRecordMap.forEach((k, v) -> {

Review Comment:
   ack



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        filesPartitionFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+
+      fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        nonFilesPartitionsFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+    }
+  }
+
+  private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig,
+                                                       Map<String, 
HoodieRecord> filesPartitionExpectedRecordsMap,
+                                                       Map<String, 
HoodieRecord> rliPartitionExpectedRecordsMap,
+                                                       String 
validMetadataInstant) throws IOException {
+    // read from MDT and validate all records.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      FileSlice filesFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.FILES.getPartitionPath()).get(0);
+      readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, 
filesFileSlice, filesPartitionExpectedRecordsMap, validMetadataInstant);
+
+      FileSlice rliFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath()).get(0);
+      readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, 
rliFileSlice, rliPartitionExpectedRecordsMap, validMetadataInstant);
+    }
+  }
+
+  private void readFromMDTFileSliceAndValidate(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, FileSlice fileSlice, 
Map<String, HoodieRecord> expectedRecordsMap,
+                                               String validMetadataInstant)
+      throws IOException {
+    // open readers
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
openReaders(MetadataPartitionType.FILES.getPartitionPath(), fileSlice, 
metaClient, metadataMetaClient, hoodieWriteConfig,
+        validMetadataInstant);
+    try {
+      // read from the file slice of interest.
+      List<String> sortedKeysForFilesPartition = new 
ArrayList<>(expectedRecordsMap.keySet());
+      Collections.sort(sortedKeysForFilesPartition);
+
+      Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = 
readLogRecords(readers.getRight(), sortedKeysForFilesPartition);
+
+      Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap =
+          readFromBaseAndMergeWithLogRecords(readers.getKey(), 
sortedKeysForFilesPartition, logRecords, fileSlice.getPartitionPath());
+      assertEquals(actualMdtRecordMap.size(), expectedRecordsMap.size());
+      actualMdtRecordMap.forEach((k, v) -> {
+        assertTrue(expectedRecordsMap.containsKey(k));
+        if (!k.equals(RECORDKEY_PARTITION_LIST)) {
+          // ignore __all_partition_records sinec it could have partitions 
from first commit which could be from HoodieTestDatagenerator.
+          assertEquals(((HoodieMetadataPayload) 
expectedRecordsMap.get(k).getData()).getFilenames(), ((HoodieMetadataPayload) 
v.getData()).getFilenames());
+        }
+      });
+    } finally {
+      if (readers.getKey() != null) {
+        readers.getKey().close();
+      }
+      if (readers.getValue() != null) {
+        readers.getValue().close();
+      }
+    }
+  }
+
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
+                                                                          
List<String> sortedKeys) {
+    if (logRecordReader == null) {
+      return Collections.emptyMap();
+    }
+
+    return logRecordReader.getRecordsByKeys(sortedKeys);
+  }
+
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
+                                                                               
               List<String> sortedKeys,
+                                                                               
               Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords,
+                                                                               
               String partitionName) throws IOException {
+    if (reader == null) {
+      // No base file at all
+      return logRecords;
+    }
+
+    Map<String, HoodieRecord<HoodieMetadataPayload>> records =
+        fetchBaseFileRecordsByKeys(reader, sortedKeys, partitionName);
+
+    // Iterate over all provided log-records, merging them into existing 
records
+    logRecords.values().forEach(logRecord ->
+        records.merge(
+            logRecord.getRecordKey(),
+            logRecord,
+            (oldRecord, newRecord) -> {
+              HoodieMetadataPayload mergedPayload = 
newRecord.getData().preCombine(oldRecord.getData());
+              return mergedPayload.isDeleted() ? null : new 
HoodieAvroRecord<>(oldRecord.getKey(), mergedPayload);
+            }
+        ));
+
+    return records;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader,
+                                                                               
       List<String> sortedKeys,
+                                                                               
       String partitionName) throws IOException {
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+    try (ClosableIterator<HoodieRecord<?>> records = 
reader.getRecordsByKeysIterator(sortedKeys)) {
+      result = toStream(records)
+          .map(record -> {
+            GenericRecord data = (GenericRecord) record.getData();
+            // populateMetaFields is hardcoded to false for the metadata table 
so key must be extracted from the `key` field
+            String recordKey = (String) 
data.get(HoodieMetadataPayload.KEY_FIELD_NAME);
+            return Pair.of(recordKey, composeRecord(data, recordKey, 
partitionName));
+          })
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+    }
+    return result;
+  }
+
+  private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord 
avroRecord, String recordKey, String partitionName) {
+    return new HoodieAvroRecord<>(new HoodieKey(recordKey, partitionName),
+        new HoodieMetadataPayload(avroRecord, 0L), null);
+  }
+
+  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> 
openReaders(String partitionName, FileSlice slice, HoodieTableMetaClient 
dataMetaClient,
+                                                                               
       HoodieTableMetaClient metadataMetaClient, HoodieWriteConfig 
datatableWriteConfig,
+                                                                               
       String validMetadataInstant) {
+    try {
+      HoodieSeekingFileReader<?> baseFileReader = getBaseFileReader(slice, 
metadataMetaClient);
+      List<HoodieLogFile> logFiles = 
slice.getLogFiles().collect(Collectors.toList());
+      HoodieMetadataLogRecordReader logRecordScanner = 
getLogRecordScanner(logFiles, partitionName, dataMetaClient, 
metadataMetaClient, datatableWriteConfig.getMetadataConfig(),
+          validMetadataInstant);
+      return Pair.of(baseFileReader, logRecordScanner);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error opening readers for metadata table 
partition " + partitionName, e);
+    }
+  }
+
+  private HoodieSeekingFileReader<?> getBaseFileReader(FileSlice slice, 
HoodieTableMetaClient metadataMetaClient) throws IOException {

Review Comment:
   yes, exactly. infact, it slipped my mind and used FG reader to directly read 
from a file group in MDT and ran into exceptions :) 
   and then had to pull in so much code. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to