the-other-tim-brown commented on code in PR #13295:
URL: https://github.com/apache/hudi/pull/13295#discussion_r2112692911


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,

Review Comment:
   Nitpick: I prefer to output objects instead of modifying the input in place 
since it is easier foe me to follow when reviewing or modifying code



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        filesPartitionFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+
+      fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        nonFilesPartitionsFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+    }
+  }
+
+  private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig,
+                                                       Map<String, 
HoodieRecord> filesPartitionExpectedRecordsMap,
+                                                       Map<String, 
HoodieRecord> rliPartitionExpectedRecordsMap,
+                                                       String 
validMetadataInstant) throws IOException {
+    // read from MDT and validate all records.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      FileSlice filesFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.FILES.getPartitionPath()).get(0);
+      readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, 
filesFileSlice, filesPartitionExpectedRecordsMap, validMetadataInstant);
+
+      FileSlice rliFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath()).get(0);
+      readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, 
rliFileSlice, rliPartitionExpectedRecordsMap, validMetadataInstant);
+    }
+  }
+
+  private void readFromMDTFileSliceAndValidate(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, FileSlice fileSlice, 
Map<String, HoodieRecord> expectedRecordsMap,
+                                               String validMetadataInstant)
+      throws IOException {
+    // open readers
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
openReaders(MetadataPartitionType.FILES.getPartitionPath(), fileSlice, 
metaClient, metadataMetaClient, hoodieWriteConfig,
+        validMetadataInstant);
+    try {
+      // read from the file slice of interest.
+      List<String> sortedKeysForFilesPartition = new 
ArrayList<>(expectedRecordsMap.keySet());
+      Collections.sort(sortedKeysForFilesPartition);
+
+      Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = 
readLogRecords(readers.getRight(), sortedKeysForFilesPartition);
+
+      Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap =
+          readFromBaseAndMergeWithLogRecords(readers.getKey(), 
sortedKeysForFilesPartition, logRecords, fileSlice.getPartitionPath());
+      assertEquals(actualMdtRecordMap.size(), expectedRecordsMap.size());
+      actualMdtRecordMap.forEach((k, v) -> {

Review Comment:
   nipick: use more descriptive names than k,v if possible



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Upsert commit action executor for Metadata table.
+ *
+ * @param <T>
+ */
+public class SparkMetadataTableUpsertCommitActionExecutor<T> extends 
SparkUpsertPreppedDeltaCommitActionExecutor<T> {
+
+  private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new 
WorkloadStat();
+  private final List<HoodieFileGroupId> mdtFileGroupIdList;
+  private final boolean initialCall;
+
+  public SparkMetadataTableUpsertCommitActionExecutor(HoodieSparkEngineContext 
context, HoodieWriteConfig config, HoodieTable table, String instantTime,
+                                                      
HoodieData<HoodieRecord<T>> preppedRecords, List<HoodieFileGroupId> 
mdtFileGroupIdList,
+                                                      boolean initialCall) {
+    super(context, config, table, instantTime, preppedRecords);
+    this.mdtFileGroupIdList = mdtFileGroupIdList;
+    this.initialCall = initialCall;
+  }
+
+  @Override
+  protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> 
inputRDD) {
+    return inputRDD.getStorageLevel() == StorageLevel.NONE();
+  }
+
+  @Override
+  protected WorkloadProfile prepareWorkloadProfile(HoodieData<HoodieRecord<T>> 
inputRecordsWithClusteringUpdate) {
+    // create workload profile only when we are writing to FILES partition in 
Metadata table.
+    WorkloadProfile workloadProfile = new 
WorkloadProfile(Pair.of(Collections.emptyMap(), PLACEHOLDER_GLOBAL_STAT));
+    return workloadProfile;
+  }
+
+  protected void saveWorkloadProfileMetadataToInflight(WorkloadProfile 
profile, String instantTime)
+      throws HoodieCommitException {
+    // with streaming writes support, we might write to metadata table 
multiple times for the same instant times.
+    // ie. writeClient.startCommit(t1), writeClient.upsert(batch1, t1), 
writeClient.upsert(batch2, t1), writeClient.commit(t1, ...)
+    // So, here we are generating inflight file only in the last known writes, 
which we know will only have FILES partition.
+    if (!initialCall) {

Review Comment:
   Should this condition be flipped?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));

Review Comment:
   If there are two `.inflight` files for a commit, will that throw an 
exception somewhere? Wondering if we want to validate this by listing the 
directory manually and inspecting it.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1408,6 +1411,8 @@ protected void commitInternal(String instantTime, 
Map<String, HoodieData<HoodieR
       bulkInsertAndCommit(writeClient, instantTime, preppedRecordInputs, 
bulkInsertPartitioner);
     } else {
       engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Upserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
+      // to do: fix the last argument is required so that we can support 
streaming writes to metadata table.
+      // Option.of(partitionFileIdPairs)

Review Comment:
   if this is handled in a later PR, my suggestion is to just remove the TODO 
from here



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        filesPartitionFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+
+      fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        nonFilesPartitionsFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+    }
+  }
+
+  private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig,
+                                                       Map<String, 
HoodieRecord> filesPartitionExpectedRecordsMap,
+                                                       Map<String, 
HoodieRecord> rliPartitionExpectedRecordsMap,
+                                                       String 
validMetadataInstant) throws IOException {
+    // read from MDT and validate all records.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      FileSlice filesFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.FILES.getPartitionPath()).get(0);
+      readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, 
filesFileSlice, filesPartitionExpectedRecordsMap, validMetadataInstant);
+
+      FileSlice rliFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath()).get(0);
+      readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, 
rliFileSlice, rliPartitionExpectedRecordsMap, validMetadataInstant);
+    }
+  }
+
+  private void readFromMDTFileSliceAndValidate(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, FileSlice fileSlice, 
Map<String, HoodieRecord> expectedRecordsMap,
+                                               String validMetadataInstant)
+      throws IOException {
+    // open readers
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
openReaders(MetadataPartitionType.FILES.getPartitionPath(), fileSlice, 
metaClient, metadataMetaClient, hoodieWriteConfig,
+        validMetadataInstant);
+    try {
+      // read from the file slice of interest.
+      List<String> sortedKeysForFilesPartition = new 
ArrayList<>(expectedRecordsMap.keySet());
+      Collections.sort(sortedKeysForFilesPartition);
+
+      Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = 
readLogRecords(readers.getRight(), sortedKeysForFilesPartition);
+
+      Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap =
+          readFromBaseAndMergeWithLogRecords(readers.getKey(), 
sortedKeysForFilesPartition, logRecords, fileSlice.getPartitionPath());
+      assertEquals(actualMdtRecordMap.size(), expectedRecordsMap.size());
+      actualMdtRecordMap.forEach((k, v) -> {
+        assertTrue(expectedRecordsMap.containsKey(k));
+        if (!k.equals(RECORDKEY_PARTITION_LIST)) {
+          // ignore __all_partition_records sinec it could have partitions 
from first commit which could be from HoodieTestDatagenerator.
+          assertEquals(((HoodieMetadataPayload) 
expectedRecordsMap.get(k).getData()).getFilenames(), ((HoodieMetadataPayload) 
v.getData()).getFilenames());
+        }
+      });
+    } finally {
+      if (readers.getKey() != null) {
+        readers.getKey().close();
+      }
+      if (readers.getValue() != null) {
+        readers.getValue().close();
+      }
+    }
+  }
+
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
+                                                                          
List<String> sortedKeys) {
+    if (logRecordReader == null) {
+      return Collections.emptyMap();
+    }
+
+    return logRecordReader.getRecordsByKeys(sortedKeys);
+  }
+
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
+                                                                               
               List<String> sortedKeys,
+                                                                               
               Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords,
+                                                                               
               String partitionName) throws IOException {
+    if (reader == null) {
+      // No base file at all
+      return logRecords;
+    }
+
+    Map<String, HoodieRecord<HoodieMetadataPayload>> records =
+        fetchBaseFileRecordsByKeys(reader, sortedKeys, partitionName);
+
+    // Iterate over all provided log-records, merging them into existing 
records
+    logRecords.values().forEach(logRecord ->
+        records.merge(
+            logRecord.getRecordKey(),
+            logRecord,
+            (oldRecord, newRecord) -> {
+              HoodieMetadataPayload mergedPayload = 
newRecord.getData().preCombine(oldRecord.getData());
+              return mergedPayload.isDeleted() ? null : new 
HoodieAvroRecord<>(oldRecord.getKey(), mergedPayload);
+            }
+        ));
+
+    return records;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader,
+                                                                               
       List<String> sortedKeys,
+                                                                               
       String partitionName) throws IOException {
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+    try (ClosableIterator<HoodieRecord<?>> records = 
reader.getRecordsByKeysIterator(sortedKeys)) {
+      result = toStream(records)
+          .map(record -> {
+            GenericRecord data = (GenericRecord) record.getData();
+            // populateMetaFields is hardcoded to false for the metadata table 
so key must be extracted from the `key` field
+            String recordKey = (String) 
data.get(HoodieMetadataPayload.KEY_FIELD_NAME);
+            return Pair.of(recordKey, composeRecord(data, recordKey, 
partitionName));
+          })
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+    }
+    return result;
+  }
+
+  private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord 
avroRecord, String recordKey, String partitionName) {
+    return new HoodieAvroRecord<>(new HoodieKey(recordKey, partitionName),
+        new HoodieMetadataPayload(avroRecord, 0L), null);
+  }
+
+  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> 
openReaders(String partitionName, FileSlice slice, HoodieTableMetaClient 
dataMetaClient,
+                                                                               
       HoodieTableMetaClient metadataMetaClient, HoodieWriteConfig 
datatableWriteConfig,
+                                                                               
       String validMetadataInstant) {
+    try {
+      HoodieSeekingFileReader<?> baseFileReader = getBaseFileReader(slice, 
metadataMetaClient);
+      List<HoodieLogFile> logFiles = 
slice.getLogFiles().collect(Collectors.toList());
+      HoodieMetadataLogRecordReader logRecordScanner = 
getLogRecordScanner(logFiles, partitionName, dataMetaClient, 
metadataMetaClient, datatableWriteConfig.getMetadataConfig(),
+          validMetadataInstant);
+      return Pair.of(baseFileReader, logRecordScanner);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error opening readers for metadata table 
partition " + partitionName, e);
+    }
+  }
+
+  private HoodieSeekingFileReader<?> getBaseFileReader(FileSlice slice, 
HoodieTableMetaClient metadataMetaClient) throws IOException {

Review Comment:
   Will we be able to replace a lot of this reader code with the 
FileGroupReader?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {

Review Comment:
   instead of collecting in the step above, you can just use `forEach` directly



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        filesPartitionFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+
+      fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        nonFilesPartitionsFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+    }
+  }
+
+  private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient 
metadataMetaClient, HoodieWriteConfig hoodieWriteConfig,
+                                                       Map<String, 
HoodieRecord> filesPartitionExpectedRecordsMap,
+                                                       Map<String, 
HoodieRecord> rliPartitionExpectedRecordsMap,
+                                                       String 
validMetadataInstant) throws IOException {
+    // read from MDT and validate all records.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      FileSlice filesFileSlice =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.FILES.getPartitionPath()).get(0);

Review Comment:
   For these checks, instead of doing `get(0)` can we make it handle the list? 
if not required, can you assert the list is always a single element?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase {
+
+  private Random random;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    super.setUp();
+    random = new Random(0xDEED);
+  }
+
+  @Test
+  public void testWritesViaMetadataWriteClient() throws Exception {
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
+            .withRecordIndexFileGroupCount(1, 1).build()).build();
+
+    // trigger end to end write to data table so that metadata table is also 
initialized.
+    initDataTableWithACommit(hoodieWriteConfig);
+
+    // fetch metadata file slice info
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
+    List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
+    List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build();
+    fetchMetadataFileSliceInfo(metadataMetaClient, 
filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, 
mdtPartitionsFileIdMapping);
+
+    List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>();
+    Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new 
HashMap<>();
+    List<String> expectedAllPartitions = new ArrayList<>();
+    List<HoodieRecord> rliRecords = new ArrayList<>();
+    Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>();
+    String commitTimeOfInterest = null;
+
+    // create Write client to SparkRDDMetadataWriteClient and trigger writes.
+    try (SparkRDDMetadataWriteClient client = new 
SparkRDDMetadataWriteClient(context, mdtWriteConfig)) {
+      commitTimeOfInterest = client.createNewInstantTime();
+
+      // prepare FILES partition records.
+      
prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0),
+          commitTimeOfInterest, filesPartitionExpectedRecords, 
filesPartitionExpectedRecordsMap, expectedAllPartitions);
+
+      // prepare RLI records.
+      prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, 
expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap);
+
+      // ingest RLI records to metadata table.
+      client.startCommitForMetadataTable(metadataMetaClient, 
commitTimeOfInterest, DELTA_COMMIT_ACTION);
+      JavaRDD<WriteStatus> partialWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, 
Option.of(nonFilesPartitionFileGroupIdList));
+      List<WriteStatus> partialWriteStatuses = 
partialWriteStatusesRDD.collect();
+
+      // validate that the commit is still pending since we are streaming 
write to metadata table.
+      HoodieActiveTimeline reloadedMdtActiveTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      
assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(),
 3); // files, rli instantiaton and 1 write to data table.
+      String finalCommitTimeOfInterest = commitTimeOfInterest;
+      
assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant
 -> instant.requestedTime().equals(finalCommitTimeOfInterest)));
+
+      // write to FILES partition
+      JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = 
client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), 
commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList));
+      List<WriteStatus> filesPartitionWriteStatus = 
filePartitionWriteStatusesRDD.collect();
+      List<HoodieWriteStat> allWriteStats = new ArrayList<>();
+      allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
+      allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus 
-> writeStatus.getStat()).collect(Collectors.toList()));
+      client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), 
DELTA_COMMIT_ACTION);
+    }
+
+    // validate
+    readFromMetadataTableAndValidateRecords(metadataMetaClient, 
hoodieWriteConfig, filesPartitionExpectedRecordsMap, 
rliPartitionExpectedRecordsMap, commitTimeOfInterest);
+  }
+
+  private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) 
throws Exception {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 100;
+      String newCommitTime = client.createNewInstantTime();
+      insertBatch(hoodieWriteConfig, client, newCommitTime, 
HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords, 1, Option.empty(), 
INSTANT_GENERATOR);
+    }
+  }
+
+  private void fetchMetadataFileSliceInfo(HoodieTableMetaClient 
metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList,
+                                          List<HoodieFileGroupId> 
nonFilesPartitionsFileGroupIdList, Map<String, List<String>> 
mdtPartitionsFileIdMapping) {
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      List<FileSlice> fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {
+        filesPartitionFileGroupIdList.add(new 
HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), 
fileSlice.getFileId()));
+      });
+
+      fileSlices =
+          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+      
mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 fileSlices.stream().map(fileSlice -> 
fileSlice.getFileId()).collect(Collectors.toList()));
+      fileSlices.stream().forEach(fileSlice -> {

Review Comment:
   instead of collecting in the step above, you can just use `forEach` directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to